From 7306683306c971eafc937df23dce5b3ed2569f20 Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Thu, 1 Aug 2024 17:15:54 +0000 Subject: [PATCH 01/29] Add environment variable for pod name Signed-off-by: Bugra Gedik --- .../pluginmachinery/flytek8s/k8s_resource_adds.go | 8 ++++++++ .../pluginmachinery/flytek8s/k8s_resource_adds_test.go | 10 +++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go index 34e13adfa8..b0025fdddf 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go @@ -60,6 +60,14 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1 Name: "FLYTE_INTERNAL_EXECUTION_DOMAIN", Value: nodeExecutionID.Domain, }, + { + Name: "FLYTE_INTERNAL_POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, { Name: "FLYTE_ATTEMPT_NUMBER", Value: attemptNumber, diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go index 4015a8d9b8..fd4828fbbd 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go @@ -27,13 +27,13 @@ func TestGetExecutionEnvVars(t *testing.T) { }{ { "no-console-url", - 12, + 13, "", nil, }, { "with-console-url", - 13, + 14, "scheme://host/path", &v12.EnvVar{ Name: "FLYTE_EXECUTION_URL", @@ -42,7 +42,7 @@ func TestGetExecutionEnvVars(t *testing.T) { }, { "with-console-url-ending-in-single-slash", - 13, + 14, "scheme://host/path/", &v12.EnvVar{ Name: "FLYTE_EXECUTION_URL", @@ -51,7 +51,7 @@ func TestGetExecutionEnvVars(t *testing.T) { }, { "with-console-url-ending-in-multiple-slashes", - 13, + 14, "scheme://host/path////", &v12.EnvVar{ Name: "FLYTE_EXECUTION_URL", @@ -63,7 +63,7 @@ func TestGetExecutionEnvVars(t *testing.T) { envVars := GetExecutionEnvVars(mock, tt.consoleURL) assert.Len(t, envVars, tt.expectedEnvVars) if tt.expectedEnvVar != nil { - assert.True(t, proto.Equal(&envVars[4], tt.expectedEnvVar)) + assert.True(t, proto.Equal(&envVars[5], tt.expectedEnvVar)) } } } From dd2957b7696d6ae12317a92cff9136f7f8892843 Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Thu, 1 Aug 2024 15:19:05 -0700 Subject: [PATCH 02/29] [flyteadmin] Refactor panic recovery into middleware (#5546) * Refactor panic handling to middleware Signed-off-by: Jason Parraga * Remove registration of old panicCounter Signed-off-by: Jason Parraga * Add test coverage Signed-off-by: Jason Parraga --------- Signed-off-by: Jason Parraga Signed-off-by: Bugra Gedik --- flyteadmin/pkg/rpc/adminservice/attributes.go | 10 --- flyteadmin/pkg/rpc/adminservice/base.go | 13 --- flyteadmin/pkg/rpc/adminservice/base_test.go | 40 --------- .../rpc/adminservice/description_entity.go | 2 - flyteadmin/pkg/rpc/adminservice/execution.go | 10 --- .../pkg/rpc/adminservice/launch_plan.go | 7 -- flyteadmin/pkg/rpc/adminservice/metrics.go | 7 +- .../middleware/recovery_interceptor.go | 61 +++++++++++++ .../middleware/recovery_interceptor_test.go | 90 +++++++++++++++++++ .../pkg/rpc/adminservice/named_entity.go | 3 - .../pkg/rpc/adminservice/node_execution.go | 6 -- flyteadmin/pkg/rpc/adminservice/project.go | 5 -- flyteadmin/pkg/rpc/adminservice/task.go | 4 - .../pkg/rpc/adminservice/task_execution.go | 4 - flyteadmin/pkg/rpc/adminservice/version.go | 1 - flyteadmin/pkg/rpc/adminservice/workflow.go | 4 - flyteadmin/pkg/server/service.go | 29 +++++- 17 files changed, 177 insertions(+), 119 deletions(-) delete mode 100644 flyteadmin/pkg/rpc/adminservice/base_test.go create mode 100644 flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor.go create mode 100644 flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor_test.go diff --git a/flyteadmin/pkg/rpc/adminservice/attributes.go b/flyteadmin/pkg/rpc/adminservice/attributes.go index 46607da93e..62002a0e6e 100644 --- a/flyteadmin/pkg/rpc/adminservice/attributes.go +++ b/flyteadmin/pkg/rpc/adminservice/attributes.go @@ -12,7 +12,6 @@ import ( func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesUpdateRequest) ( *admin.WorkflowAttributesUpdateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -30,7 +29,6 @@ func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *ad func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesGetRequest) ( *admin.WorkflowAttributesGetResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -48,7 +46,6 @@ func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesDeleteRequest) ( *admin.WorkflowAttributesDeleteResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -66,7 +63,6 @@ func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *ad func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesUpdateRequest) ( *admin.ProjectDomainAttributesUpdateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -84,7 +80,6 @@ func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, reques func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesGetRequest) ( *admin.ProjectDomainAttributesGetResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -102,7 +97,6 @@ func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request * func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesDeleteRequest) ( *admin.ProjectDomainAttributesDeleteResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -121,7 +115,6 @@ func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, reques func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *admin.ProjectAttributesUpdateRequest) ( *admin.ProjectAttributesUpdateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -140,7 +133,6 @@ func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *adm func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin.ProjectAttributesGetRequest) ( *admin.ProjectAttributesGetResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -159,7 +151,6 @@ func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin. func (m *AdminService) DeleteProjectAttributes(ctx context.Context, request *admin.ProjectAttributesDeleteRequest) ( *admin.ProjectAttributesDeleteResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -177,7 +168,6 @@ func (m *AdminService) DeleteProjectAttributes(ctx context.Context, request *adm func (m *AdminService) ListMatchableAttributes(ctx context.Context, request *admin.ListMatchableAttributesRequest) ( *admin.ListMatchableAttributesResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/rpc/adminservice/base.go b/flyteadmin/pkg/rpc/adminservice/base.go index 5a2cb2ad89..8df2c595c7 100644 --- a/flyteadmin/pkg/rpc/adminservice/base.go +++ b/flyteadmin/pkg/rpc/adminservice/base.go @@ -5,8 +5,6 @@ import ( "fmt" "runtime/debug" - "github.com/golang/protobuf/proto" - "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent" eventWriter "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/implementations" "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications" @@ -44,17 +42,6 @@ type AdminService struct { Metrics AdminMetrics } -// Intercepts all admin requests to handle panics during execution. -func (m *AdminService) interceptPanic(ctx context.Context, request proto.Message) { - err := recover() - if err == nil { - return - } - - m.Metrics.PanicCounter.Inc() - logger.Fatalf(ctx, "panic-ed for request: [%+v] with err: %v with Stack: %v", request, err, string(debug.Stack())) -} - const defaultRetries = 3 func NewAdminServer(ctx context.Context, pluginRegistry *plugins.Registry, configuration runtimeIfaces.Configuration, diff --git a/flyteadmin/pkg/rpc/adminservice/base_test.go b/flyteadmin/pkg/rpc/adminservice/base_test.go deleted file mode 100644 index 9b1cb626d5..0000000000 --- a/flyteadmin/pkg/rpc/adminservice/base_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package adminservice - -import ( - "context" - "testing" - - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - - "github.com/flyteorg/flyte/flytestdlib/logger" - "github.com/flyteorg/flyte/flytestdlib/promutils" -) - -func Test_interceptPanic(t *testing.T) { - m := AdminService{ - Metrics: InitMetrics(promutils.NewTestScope()), - } - - ctx := context.Background() - - // Mute logs to avoid .Fatal() (called in interceptPanic) causing the process to close - assert.NoError(t, logger.SetConfig(&logger.Config{Mute: true})) - - func() { - defer func() { - if err := recover(); err != nil { - assert.Fail(t, "Unexpected error", err) - } - }() - - a := func() { - defer m.interceptPanic(ctx, proto.Message(nil)) - - var x *int - *x = 10 - } - - a() - }() -} diff --git a/flyteadmin/pkg/rpc/adminservice/description_entity.go b/flyteadmin/pkg/rpc/adminservice/description_entity.go index 1d08234051..bc2d794aed 100644 --- a/flyteadmin/pkg/rpc/adminservice/description_entity.go +++ b/flyteadmin/pkg/rpc/adminservice/description_entity.go @@ -13,7 +13,6 @@ import ( ) func (m *AdminService) GetDescriptionEntity(ctx context.Context, request *admin.ObjectGetRequest) (*admin.DescriptionEntity, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -36,7 +35,6 @@ func (m *AdminService) GetDescriptionEntity(ctx context.Context, request *admin. } func (m *AdminService) ListDescriptionEntities(ctx context.Context, request *admin.DescriptionEntityListRequest) (*admin.DescriptionEntityList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/rpc/adminservice/execution.go b/flyteadmin/pkg/rpc/adminservice/execution.go index 919ed851a3..15caf5aa75 100644 --- a/flyteadmin/pkg/rpc/adminservice/execution.go +++ b/flyteadmin/pkg/rpc/adminservice/execution.go @@ -13,7 +13,6 @@ import ( func (m *AdminService) CreateExecution( ctx context.Context, request *admin.ExecutionCreateRequest) (*admin.ExecutionCreateResponse, error) { - defer m.interceptPanic(ctx, request) requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") @@ -32,7 +31,6 @@ func (m *AdminService) CreateExecution( func (m *AdminService) RelaunchExecution( ctx context.Context, request *admin.ExecutionRelaunchRequest) (*admin.ExecutionCreateResponse, error) { - defer m.interceptPanic(ctx, request) requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") @@ -51,7 +49,6 @@ func (m *AdminService) RelaunchExecution( func (m *AdminService) RecoverExecution( ctx context.Context, request *admin.ExecutionRecoverRequest) (*admin.ExecutionCreateResponse, error) { - defer m.interceptPanic(ctx, request) requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") @@ -70,7 +67,6 @@ func (m *AdminService) RecoverExecution( func (m *AdminService) CreateWorkflowEvent( ctx context.Context, request *admin.WorkflowExecutionEventRequest) (*admin.WorkflowExecutionEventResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -89,7 +85,6 @@ func (m *AdminService) CreateWorkflowEvent( func (m *AdminService) GetExecution( ctx context.Context, request *admin.WorkflowExecutionGetRequest) (*admin.Execution, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -107,7 +102,6 @@ func (m *AdminService) GetExecution( func (m *AdminService) UpdateExecution( ctx context.Context, request *admin.ExecutionUpdateRequest) (*admin.ExecutionUpdateResponse, error) { - defer m.interceptPanic(ctx, request) requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") @@ -126,7 +120,6 @@ func (m *AdminService) UpdateExecution( func (m *AdminService) GetExecutionData( ctx context.Context, request *admin.WorkflowExecutionGetDataRequest) (*admin.WorkflowExecutionGetDataResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -144,7 +137,6 @@ func (m *AdminService) GetExecutionData( func (m *AdminService) GetExecutionMetrics( ctx context.Context, request *admin.WorkflowExecutionGetMetricsRequest) (*admin.WorkflowExecutionGetMetricsResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -162,7 +154,6 @@ func (m *AdminService) GetExecutionMetrics( func (m *AdminService) ListExecutions( ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -180,7 +171,6 @@ func (m *AdminService) ListExecutions( func (m *AdminService) TerminateExecution( ctx context.Context, request *admin.ExecutionTerminateRequest) (*admin.ExecutionTerminateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/rpc/adminservice/launch_plan.go b/flyteadmin/pkg/rpc/adminservice/launch_plan.go index ff3c2480e0..1586c3f542 100644 --- a/flyteadmin/pkg/rpc/adminservice/launch_plan.go +++ b/flyteadmin/pkg/rpc/adminservice/launch_plan.go @@ -14,7 +14,6 @@ import ( func (m *AdminService) CreateLaunchPlan( ctx context.Context, request *admin.LaunchPlanCreateRequest) (*admin.LaunchPlanCreateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -31,7 +30,6 @@ func (m *AdminService) CreateLaunchPlan( } func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectGetRequest) (*admin.LaunchPlan, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -55,7 +53,6 @@ func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectG } func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.ActiveLaunchPlanRequest) (*admin.LaunchPlan, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -73,7 +70,6 @@ func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.A func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.LaunchPlanUpdateRequest) ( *admin.LaunchPlanUpdateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -97,7 +93,6 @@ func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.Laun func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.ResourceListRequest) ( *admin.LaunchPlanList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } @@ -116,7 +111,6 @@ func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.Resou func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin.ActiveLaunchPlanListRequest) ( *admin.LaunchPlanList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } @@ -135,7 +129,6 @@ func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin func (m *AdminService) ListLaunchPlanIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) ( *admin.NamedEntityIdentifierList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } diff --git a/flyteadmin/pkg/rpc/adminservice/metrics.go b/flyteadmin/pkg/rpc/adminservice/metrics.go index 65c6b741f3..f770665ef6 100644 --- a/flyteadmin/pkg/rpc/adminservice/metrics.go +++ b/flyteadmin/pkg/rpc/adminservice/metrics.go @@ -2,8 +2,6 @@ package adminservice import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/flyteorg/flyte/flyteadmin/pkg/rpc/adminservice/util" "github.com/flyteorg/flyte/flytestdlib/promutils" ) @@ -115,8 +113,7 @@ type descriptionEntityEndpointMetrics struct { } type AdminMetrics struct { - Scope promutils.Scope - PanicCounter prometheus.Counter + Scope promutils.Scope executionEndpointMetrics executionEndpointMetrics launchPlanEndpointMetrics launchPlanEndpointMetrics @@ -137,8 +134,6 @@ type AdminMetrics struct { func InitMetrics(adminScope promutils.Scope) AdminMetrics { return AdminMetrics{ Scope: adminScope, - PanicCounter: adminScope.MustNewCounter("handler_panic", - "panics encountered while handling requests to the admin service"), executionEndpointMetrics: executionEndpointMetrics{ scope: adminScope, diff --git a/flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor.go b/flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor.go new file mode 100644 index 0000000000..a0a699a4f0 --- /dev/null +++ b/flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor.go @@ -0,0 +1,61 @@ +package middleware + +import ( + "context" + "runtime/debug" + + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" +) + +// RecoveryInterceptor is a struct for creating gRPC interceptors that handle panics in go +type RecoveryInterceptor struct { + panicCounter prometheus.Counter +} + +// NewRecoveryInterceptor creates a new RecoveryInterceptor with metrics under the provided scope +func NewRecoveryInterceptor(adminScope promutils.Scope) *RecoveryInterceptor { + panicCounter := adminScope.MustNewCounter("handler_panic", "panics encountered while handling gRPC requests") + return &RecoveryInterceptor{ + panicCounter: panicCounter, + } +} + +// UnaryServerInterceptor returns a new unary server interceptor for panic recovery. +func (ri *RecoveryInterceptor) UnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, err error) { + + defer func() { + if r := recover(); r != nil { + ri.panicCounter.Inc() + logger.Errorf(ctx, "panic-ed for request: [%+v] to %s with err: %v with Stack: %v", req, info.FullMethod, r, string(debug.Stack())) + // Return INTERNAL to client with no info as to not leak implementation details + err = status.Errorf(codes.Internal, "") + } + }() + + return handler(ctx, req) + } +} + +// StreamServerInterceptor returns a new streaming server interceptor for panic recovery. +func (ri *RecoveryInterceptor) StreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + + defer func() { + if r := recover(); r != nil { + ri.panicCounter.Inc() + logger.Errorf(stream.Context(), "panic-ed for stream to %s with err: %v with Stack: %v", info.FullMethod, r, string(debug.Stack())) + // Return INTERNAL to client with no info as to not leak implementation details + err = status.Errorf(codes.Internal, "") + } + }() + + return handler(srv, stream) + } +} diff --git a/flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor_test.go b/flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor_test.go new file mode 100644 index 0000000000..3928856067 --- /dev/null +++ b/flyteadmin/pkg/rpc/adminservice/middleware/recovery_interceptor_test.go @@ -0,0 +1,90 @@ +package middleware + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + mockScope "github.com/flyteorg/flyte/flytestdlib/promutils" +) + +func TestRecoveryInterceptor(t *testing.T) { + ctx := context.Background() + testScope := mockScope.NewTestScope() + recoveryInterceptor := NewRecoveryInterceptor(testScope) + unaryInterceptor := recoveryInterceptor.UnaryServerInterceptor() + streamInterceptor := recoveryInterceptor.StreamServerInterceptor() + unaryInfo := &grpc.UnaryServerInfo{} + streamInfo := &grpc.StreamServerInfo{} + req := "test-request" + + t.Run("unary should recover from panic", func(t *testing.T) { + _, err := unaryInterceptor(ctx, req, unaryInfo, func(ctx context.Context, req any) (any, error) { + panic("synthetic") + }) + expectedErr := status.Errorf(codes.Internal, "") + require.Error(t, err) + require.Equal(t, expectedErr, err) + }) + + t.Run("stream should recover from panic", func(t *testing.T) { + stream := testStream{} + err := streamInterceptor(nil, &stream, streamInfo, func(srv any, stream grpc.ServerStream) error { + panic("synthetic") + }) + expectedErr := status.Errorf(codes.Internal, "") + require.Error(t, err) + require.Equal(t, expectedErr, err) + }) + + t.Run("unary should plumb response without panic", func(t *testing.T) { + mockedResponse := "test" + resp, err := unaryInterceptor(ctx, req, unaryInfo, func(ctx context.Context, req any) (any, error) { + return mockedResponse, nil + }) + require.NoError(t, err) + require.Equal(t, mockedResponse, resp) + }) + + t.Run("stream should plumb response without panic", func(t *testing.T) { + stream := testStream{} + handlerCalled := false + err := streamInterceptor(nil, &stream, streamInfo, func(srv any, stream grpc.ServerStream) error { + handlerCalled = true + return nil + }) + require.NoError(t, err) + require.True(t, handlerCalled) + }) +} + +// testStream is an implementation of grpc.ServerStream for testing. +type testStream struct { +} + +func (s *testStream) SendMsg(m interface{}) error { + return nil +} + +func (s *testStream) RecvMsg(m interface{}) error { + return nil +} + +func (s *testStream) SetHeader(metadata.MD) error { + return nil +} + +func (s *testStream) SendHeader(metadata.MD) error { + return nil +} + +func (s *testStream) SetTrailer(metadata.MD) {} + +func (s *testStream) Context() context.Context { + return context.Background() +} diff --git a/flyteadmin/pkg/rpc/adminservice/named_entity.go b/flyteadmin/pkg/rpc/adminservice/named_entity.go index d48a0485e2..4ef8f3ee0b 100644 --- a/flyteadmin/pkg/rpc/adminservice/named_entity.go +++ b/flyteadmin/pkg/rpc/adminservice/named_entity.go @@ -11,7 +11,6 @@ import ( ) func (m *AdminService) GetNamedEntity(ctx context.Context, request *admin.NamedEntityGetRequest) (*admin.NamedEntity, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -31,7 +30,6 @@ func (m *AdminService) GetNamedEntity(ctx context.Context, request *admin.NamedE func (m *AdminService) UpdateNamedEntity(ctx context.Context, request *admin.NamedEntityUpdateRequest) ( *admin.NamedEntityUpdateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -50,7 +48,6 @@ func (m *AdminService) UpdateNamedEntity(ctx context.Context, request *admin.Nam func (m *AdminService) ListNamedEntities(ctx context.Context, request *admin.NamedEntityListRequest) ( *admin.NamedEntityList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/rpc/adminservice/node_execution.go b/flyteadmin/pkg/rpc/adminservice/node_execution.go index cf17e3ff70..1b187f3a35 100644 --- a/flyteadmin/pkg/rpc/adminservice/node_execution.go +++ b/flyteadmin/pkg/rpc/adminservice/node_execution.go @@ -14,7 +14,6 @@ import ( func (m *AdminService) CreateNodeEvent( ctx context.Context, request *admin.NodeExecutionEventRequest) (*admin.NodeExecutionEventResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -32,7 +31,6 @@ func (m *AdminService) CreateNodeEvent( func (m *AdminService) GetNodeExecution( ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -49,7 +47,6 @@ func (m *AdminService) GetNodeExecution( } func (m *AdminService) GetDynamicNodeWorkflow(ctx context.Context, request *admin.GetDynamicNodeWorkflowRequest) (*admin.DynamicNodeWorkflowResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -68,7 +65,6 @@ func (m *AdminService) GetDynamicNodeWorkflow(ctx context.Context, request *admi func (m *AdminService) ListNodeExecutions( ctx context.Context, request *admin.NodeExecutionListRequest) (*admin.NodeExecutionList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -86,7 +82,6 @@ func (m *AdminService) ListNodeExecutions( func (m *AdminService) ListNodeExecutionsForTask( ctx context.Context, request *admin.NodeExecutionForTaskListRequest) (*admin.NodeExecutionList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -111,7 +106,6 @@ func (m *AdminService) ListNodeExecutionsForTask( func (m *AdminService) GetNodeExecutionData( ctx context.Context, request *admin.NodeExecutionGetDataRequest) (*admin.NodeExecutionGetDataResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/rpc/adminservice/project.go b/flyteadmin/pkg/rpc/adminservice/project.go index 5e7352ad93..ab8d8e4375 100644 --- a/flyteadmin/pkg/rpc/adminservice/project.go +++ b/flyteadmin/pkg/rpc/adminservice/project.go @@ -12,7 +12,6 @@ import ( func (m *AdminService) RegisterProject(ctx context.Context, request *admin.ProjectRegisterRequest) ( *admin.ProjectRegisterResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -29,7 +28,6 @@ func (m *AdminService) RegisterProject(ctx context.Context, request *admin.Proje } func (m *AdminService) ListProjects(ctx context.Context, request *admin.ProjectListRequest) (*admin.Projects, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -48,7 +46,6 @@ func (m *AdminService) ListProjects(ctx context.Context, request *admin.ProjectL func (m *AdminService) UpdateProject(ctx context.Context, request *admin.Project) ( *admin.ProjectUpdateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -65,7 +62,6 @@ func (m *AdminService) UpdateProject(ctx context.Context, request *admin.Project } func (m *AdminService) GetProject(ctx context.Context, request *admin.ProjectGetRequest) (*admin.Project, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -83,7 +79,6 @@ func (m *AdminService) GetProject(ctx context.Context, request *admin.ProjectGet } func (m *AdminService) GetDomains(ctx context.Context, request *admin.GetDomainRequest) (*admin.GetDomainsResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/rpc/adminservice/task.go b/flyteadmin/pkg/rpc/adminservice/task.go index 8899480489..7db51ed2eb 100644 --- a/flyteadmin/pkg/rpc/adminservice/task.go +++ b/flyteadmin/pkg/rpc/adminservice/task.go @@ -15,7 +15,6 @@ import ( func (m *AdminService) CreateTask( ctx context.Context, request *admin.TaskCreateRequest) (*admin.TaskCreateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -32,7 +31,6 @@ func (m *AdminService) CreateTask( } func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequest) (*admin.Task, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -56,7 +54,6 @@ func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequ func (m *AdminService) ListTaskIds( ctx context.Context, request *admin.NamedEntityIdentifierListRequest) (*admin.NamedEntityIdentifierList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -74,7 +71,6 @@ func (m *AdminService) ListTaskIds( } func (m *AdminService) ListTasks(ctx context.Context, request *admin.ResourceListRequest) (*admin.TaskList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/rpc/adminservice/task_execution.go b/flyteadmin/pkg/rpc/adminservice/task_execution.go index 0561a1ba36..0638c02aa3 100644 --- a/flyteadmin/pkg/rpc/adminservice/task_execution.go +++ b/flyteadmin/pkg/rpc/adminservice/task_execution.go @@ -15,7 +15,6 @@ import ( func (m *AdminService) CreateTaskEvent( ctx context.Context, request *admin.TaskExecutionEventRequest) (*admin.TaskExecutionEventResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -34,7 +33,6 @@ func (m *AdminService) CreateTaskEvent( func (m *AdminService) GetTaskExecution( ctx context.Context, request *admin.TaskExecutionGetRequest) (*admin.TaskExecution, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -62,7 +60,6 @@ func (m *AdminService) GetTaskExecution( func (m *AdminService) ListTaskExecutions( ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Nil request") } @@ -84,7 +81,6 @@ func (m *AdminService) ListTaskExecutions( func (m *AdminService) GetTaskExecutionData( ctx context.Context, request *admin.TaskExecutionGetDataRequest) (*admin.TaskExecutionGetDataResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/rpc/adminservice/version.go b/flyteadmin/pkg/rpc/adminservice/version.go index 7fb5861e50..3049a723aa 100644 --- a/flyteadmin/pkg/rpc/adminservice/version.go +++ b/flyteadmin/pkg/rpc/adminservice/version.go @@ -8,7 +8,6 @@ import ( func (m *AdminService) GetVersion(ctx context.Context, request *admin.GetVersionRequest) (*admin.GetVersionResponse, error) { - defer m.interceptPanic(ctx, request) response, err := m.VersionManager.GetVersion(ctx, request) if err != nil { return nil, err diff --git a/flyteadmin/pkg/rpc/adminservice/workflow.go b/flyteadmin/pkg/rpc/adminservice/workflow.go index 9fcf87c453..7f6ecc4c13 100644 --- a/flyteadmin/pkg/rpc/adminservice/workflow.go +++ b/flyteadmin/pkg/rpc/adminservice/workflow.go @@ -15,7 +15,6 @@ import ( func (m *AdminService) CreateWorkflow( ctx context.Context, request *admin.WorkflowCreateRequest) (*admin.WorkflowCreateResponse, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -32,7 +31,6 @@ func (m *AdminService) CreateWorkflow( } func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGetRequest) (*admin.Workflow, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -56,7 +54,6 @@ func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGet func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) ( *admin.NamedEntityIdentifierList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -75,7 +72,6 @@ func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.Named } func (m *AdminService) ListWorkflows(ctx context.Context, request *admin.ResourceListRequest) (*admin.WorkflowList, error) { - defer m.interceptPanic(ctx, request) if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } diff --git a/flyteadmin/pkg/server/service.go b/flyteadmin/pkg/server/service.go index ff80c343d3..bb09f9f615 100644 --- a/flyteadmin/pkg/server/service.go +++ b/flyteadmin/pkg/server/service.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/handlers" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth" + grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/pkg/errors" @@ -35,6 +36,7 @@ import ( "github.com/flyteorg/flyte/flyteadmin/pkg/config" "github.com/flyteorg/flyte/flyteadmin/pkg/rpc" "github.com/flyteorg/flyte/flyteadmin/pkg/rpc/adminservice" + "github.com/flyteorg/flyte/flyteadmin/pkg/rpc/adminservice/middleware" runtime2 "github.com/flyteorg/flyte/flyteadmin/pkg/runtime" runtimeIfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flyteadmin/plugins" @@ -98,11 +100,18 @@ func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c otelgrpc.WithPropagators(propagation.TraceContext{}), ) + adminScope := scope.NewSubScope("admin") + recoveryInterceptor := middleware.NewRecoveryInterceptor(adminScope) + var chainedUnaryInterceptors grpc.UnaryServerInterceptor if cfg.Security.UseAuth { logger.Infof(ctx, "Creating gRPC server with authentication") middlewareInterceptors := plugins.Get[grpc.UnaryServerInterceptor](pluginRegistry, plugins.PluginIDUnaryServiceMiddleware) - chainedUnaryInterceptors = grpcmiddleware.ChainUnaryServer(grpcprometheus.UnaryServerInterceptor, + chainedUnaryInterceptors = grpcmiddleware.ChainUnaryServer( + // recovery interceptor should always be first in order to handle any panics in the middleware or server + recoveryInterceptor.UnaryServerInterceptor(), + grpcrecovery.UnaryServerInterceptor(), + grpcprometheus.UnaryServerInterceptor, otelUnaryServerInterceptor, auth.GetAuthenticationCustomMetadataInterceptor(authCtx), grpcauth.UnaryServerInterceptor(auth.GetAuthenticationInterceptor(authCtx)), @@ -111,11 +120,23 @@ func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c ) } else { logger.Infof(ctx, "Creating gRPC server without authentication") - chainedUnaryInterceptors = grpcmiddleware.ChainUnaryServer(grpcprometheus.UnaryServerInterceptor, otelUnaryServerInterceptor) + chainedUnaryInterceptors = grpcmiddleware.ChainUnaryServer( + // recovery interceptor should always be first in order to handle any panics in the middleware or server + recoveryInterceptor.UnaryServerInterceptor(), + grpcprometheus.UnaryServerInterceptor, + otelUnaryServerInterceptor, + ) } + chainedStreamInterceptors := grpcmiddleware.ChainStreamServer( + // recovery interceptor should always be first in order to handle any panics in the middleware or server + recoveryInterceptor.StreamServerInterceptor(), + grpcprometheus.StreamServerInterceptor, + ) + serverOpts := []grpc.ServerOption{ - grpc.StreamInterceptor(grpcprometheus.StreamServerInterceptor), + // recovery interceptor should always be first in order to handle any panics in the middleware or server + grpc.StreamInterceptor(chainedStreamInterceptors), grpc.UnaryInterceptor(chainedUnaryInterceptors), } if cfg.GrpcConfig.MaxMessageSizeBytes > 0 { @@ -131,7 +152,7 @@ func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c } configuration := runtime2.NewConfigurationProvider() - adminServer := adminservice.NewAdminServer(ctx, pluginRegistry, configuration, cfg.KubeConfig, cfg.Master, dataStorageClient, scope.NewSubScope("admin")) + adminServer := adminservice.NewAdminServer(ctx, pluginRegistry, configuration, cfg.KubeConfig, cfg.Master, dataStorageClient, adminScope) grpcService.RegisterAdminServiceServer(grpcServer, adminServer) if cfg.Security.UseAuth { grpcService.RegisterAuthMetadataServiceServer(grpcServer, authCtx.AuthMetadataService()) From d8e7491e29672537f5ebd207135d3420a86f4080 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 2 Aug 2024 11:41:16 +0800 Subject: [PATCH 03/29] Snowflake agent Doc (#5620) * TEST build Signed-off-by: Future-Outlier * remove emphasize-lines Signed-off-by: Future-Outlier * test build Signed-off-by: Future-Outlier * revert Signed-off-by: Future-Outlier --------- Signed-off-by: Future-Outlier Signed-off-by: Bugra Gedik --- docs/deployment/agents/index.md | 6 ++++-- docs/deployment/agents/snowflake.rst | 18 +++++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/docs/deployment/agents/index.md b/docs/deployment/agents/index.md index 912ab8613c..11ce607788 100644 --- a/docs/deployment/agents/index.md +++ b/docs/deployment/agents/index.md @@ -25,10 +25,12 @@ If you are using a managed deployment of Flyte, you will need to contact your de - Configuring your Flyte deployment for the BigQuery agent. * - {ref}`MMCloud Agent ` - Configuring your Flyte deployment for the MMCloud agent. -* - {ref}`Sensor Agent ` - - Configuring your Flyte deployment for the sensor agent. * - {ref}`SageMaker Inference ` - Deploy models and create, as well as trigger inference endpoints on SageMaker. +* - {ref}`Sensor Agent ` + - Configuring your Flyte deployment for the sensor agent. +* - {ref}`Snowflake Agent ` + - Configuring your Flyte deployment for the SnowFlake agent. * - {ref}`OpenAI Batch ` - Submit requests to OpenAI GPT models for asynchronous batch processing. ``` diff --git a/docs/deployment/agents/snowflake.rst b/docs/deployment/agents/snowflake.rst index fe1c8482ae..a689c748bf 100644 --- a/docs/deployment/agents/snowflake.rst +++ b/docs/deployment/agents/snowflake.rst @@ -1,16 +1,25 @@ .. _deployment-agent-setup-snowflake: Snowflake agent -================= +=============== This guide provides an overview of how to set up the Snowflake agent in your Flyte deployment. 1. Set up the key pair authentication in Snowflake. For more details, see the `Snowflake key-pair authentication and key-pair rotation guide `__. -2. Create a secret with the group "snowflake" and the key "private_key". For more details, see `"Using Secrets in a Task" `__. +2. Create a secret with the group "private_key" and the key "snowflake". + This is hardcoded in the flytekit sdk, since we can't know the group and key name in advance. + This is for permission to upload and download data with structured dataset in python task pod. .. code-block:: bash - kubectl create secret generic snowflake-private-key --namespace=flytesnacks-development --from-file=your_private_key_above + kubectl create secret generic private-key --from-file=snowflake= --namespace=flytesnacks-development + +3. Create a secret in the flyteagent's pod, this is for execution snowflake query in the agent pod. + +.. code-block:: bash + + ENCODED_VALUE=$(cat | base64) && kubectl patch secret flyteagent -n flyte --patch "{\"data\":{\"snowflake_private_key\":\"$ENCODED_VALUE\"}}" + Specify agent configuration ---------------------------- @@ -73,7 +82,7 @@ Specify agent configuration supportedTaskTypes: - snowflake -Ensure that the propeller has the correct service account for BigQuery. +Ensure that the propeller has the correct service account for Snowflake. Upgrade the Flyte Helm release ------------------------------ @@ -97,7 +106,6 @@ Upgrade the Flyte Helm release helm upgrade flyte/flyte-core -n --values values-override.yaml Replace ```` with the name of your release (e.g., ``flyte``) - and ```` with the name of your namespace (e.g., ``flyte``). For Snowflake agent on the Flyte cluster, see `Snowflake agent `_. From 021c606edd2cddcaef2caf01a9c1d3da86d39fa5 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 2 Aug 2024 14:24:36 +0800 Subject: [PATCH 04/29] [flytepropeller][compiler] Error Handling when Type is not found (#5612) * FlytePropeller Compiler Avoid Crash when Type not found Signed-off-by: Future-Outlier * Update pingsu's error message advices Signed-off-by: Future-Outlier Co-authored-by: pingsutw * fix lint Signed-off-by: Future-Outlier * Trigger CI Signed-off-by: Future-Outlier * Trigger CI Signed-off-by: Future-Outlier --------- Signed-off-by: Future-Outlier Co-authored-by: pingsutw Signed-off-by: Bugra Gedik --- .../validation/launch_plan_validator_test.go | 14 ++++--- .../pkg/manager/impl/validation/validation.go | 17 +++++++++ .../impl/validation/validation_test.go | 38 +++++++++++++++++++ 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/validation/launch_plan_validator_test.go b/flyteadmin/pkg/manager/impl/validation/launch_plan_validator_test.go index 86bfc5c6b7..178c2b497b 100644 --- a/flyteadmin/pkg/manager/impl/validation/launch_plan_validator_test.go +++ b/flyteadmin/pkg/manager/impl/validation/launch_plan_validator_test.go @@ -13,6 +13,10 @@ import ( "github.com/flyteorg/flyte/flytestdlib/utils" ) +const ( + foo = "foo" +) + var lpApplicationConfig = testutils.GetApplicationConfigWithDefaultDomains() func getWorkflowInterface() *core.TypedInterface { @@ -344,7 +348,7 @@ func TestValidateSchedule_KickoffTimeArgPointsAtWrongType(t *testing.T) { request := testutils.GetLaunchPlanRequestWithDeprecatedCronSchedule("* * * * * *") inputMap := &core.ParameterMap{ Parameters: map[string]*core.Parameter{ - "foo": { + foo: { Var: &core.Variable{ Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}, }, @@ -354,7 +358,7 @@ func TestValidateSchedule_KickoffTimeArgPointsAtWrongType(t *testing.T) { }, }, } - request.Spec.EntityMetadata.Schedule.KickoffTimeInputArg = "foo" + request.Spec.EntityMetadata.Schedule.KickoffTimeInputArg = foo err := validateSchedule(request, inputMap) assert.NotNil(t, err) @@ -364,7 +368,7 @@ func TestValidateSchedule_NoRequired(t *testing.T) { request := testutils.GetLaunchPlanRequestWithDeprecatedCronSchedule("* * * * * *") inputMap := &core.ParameterMap{ Parameters: map[string]*core.Parameter{ - "foo": { + foo: { Var: &core.Variable{ Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}, }, @@ -383,7 +387,7 @@ func TestValidateSchedule_KickoffTimeBound(t *testing.T) { request := testutils.GetLaunchPlanRequestWithDeprecatedCronSchedule("* * * * * *") inputMap := &core.ParameterMap{ Parameters: map[string]*core.Parameter{ - "foo": { + foo: { Var: &core.Variable{ Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_DATETIME}}, }, @@ -393,7 +397,7 @@ func TestValidateSchedule_KickoffTimeBound(t *testing.T) { }, }, } - request.Spec.EntityMetadata.Schedule.KickoffTimeInputArg = "foo" + request.Spec.EntityMetadata.Schedule.KickoffTimeInputArg = foo err := validateSchedule(request, inputMap) assert.Nil(t, err) diff --git a/flyteadmin/pkg/manager/impl/validation/validation.go b/flyteadmin/pkg/manager/impl/validation/validation.go index 1958f25021..55c45db9bb 100644 --- a/flyteadmin/pkg/manager/impl/validation/validation.go +++ b/flyteadmin/pkg/manager/impl/validation/validation.go @@ -1,6 +1,7 @@ package validation import ( + "fmt" "net/url" "strconv" "strings" @@ -282,11 +283,27 @@ func validateParameterMap(inputMap *core.ParameterMap, fieldName string) error { defaultValue := defaultInput.GetDefault() if defaultValue != nil { inputType := validators.LiteralTypeForLiteral(defaultValue) + + if inputType == nil { + return errors.NewFlyteAdminErrorf(codes.InvalidArgument, + fmt.Sprintf( + "Flyte encountered an issue while determining\n"+ + "the type of the default value for Parameter '%s' in '%s'.\n"+ + "Registered type: [%s].\n"+ + "Flyte needs to support the latest FlyteIDL to support this type.\n"+ + "Suggested solution: Please update all of your Flyte images to the latest version and "+ + "try again.", + name, fieldName, defaultInput.GetVar().GetType().String(), + ), + ) + } + if !validators.AreTypesCastable(inputType, defaultInput.GetVar().GetType()) { return errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Type mismatch for Parameter %s in %s has type %s, expected %s", name, fieldName, defaultInput.GetVar().GetType().String(), inputType.String()) } + if defaultInput.GetVar().GetType().GetSimple() == core.SimpleType_DATETIME { // Make datetime specific validations return ValidateDatetime(defaultValue) diff --git a/flyteadmin/pkg/manager/impl/validation/validation_test.go b/flyteadmin/pkg/manager/impl/validation/validation_test.go index a9fed38ee9..1aa0bc7bab 100644 --- a/flyteadmin/pkg/manager/impl/validation/validation_test.go +++ b/flyteadmin/pkg/manager/impl/validation/validation_test.go @@ -320,6 +320,44 @@ func TestValidateParameterMap(t *testing.T) { err := validateParameterMap(&exampleMap, "some text") assert.NoError(t, err) }) + t.Run("invalid because inputType is nil", func(t *testing.T) { + // Create a literal that will cause LiteralTypeForLiteral to return nil. + // For example, a scalar with no value. + unsupportedLiteral := &core.Literal{ + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{}, + }, + } + + name := "foo" + fieldName := "test_field_name" + exampleMap := core.ParameterMap{ + Parameters: map[string]*core.Parameter{ + name: { + Var: &core.Variable{ + // 1000 means an unsupported type + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: 1000}}, + }, + Behavior: &core.Parameter_Default{ + Default: unsupportedLiteral, + }, + }, + }, + } + err := validateParameterMap(&exampleMap, fieldName) + assert.Error(t, err) + fmt.Println(err.Error()) + expectedErrMsg := fmt.Sprintf( + "Flyte encountered an issue while determining\n"+ + "the type of the default value for Parameter '%s' in '%s'.\n"+ + "Registered type: [%s].\n"+ + "Flyte needs to support the latest FlyteIDL to support this type.\n"+ + "Suggested solution: Please update all of your Flyte images to the latest version and "+ + "try again.", + name, fieldName, exampleMap.Parameters[name].GetVar().GetType().String(), + ) + assert.Equal(t, expectedErrMsg, err.Error()) + }) } func TestValidateToken(t *testing.T) { From 91d6d401babe332cb1b3ee7f971641efb064586f Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Thu, 1 Aug 2024 23:46:52 -0700 Subject: [PATCH 05/29] Fix nil pointer when task plugin load returns error (#5622) Signed-off-by: Bugra Gedik --- .../pkg/controller/nodes/task/handler.go | 7 ++++--- .../pkg/controller/nodes/task/handler_test.go | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index d1595890d8..9ec47985c9 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -248,13 +248,14 @@ func (t *Handler) Setup(ctx context.Context, sCtx interfaces.SetupContext) error logger.Infof(ctx, "Loading Plugin [%s] ENABLED", p.ID) cp, err := pluginCore.LoadPlugin(ctx, sCtxFinal, p) + if err != nil { + return regErrors.Wrapf(err, "failed to load plugin - %s", p.ID) + } + if cp.GetID() == agent.ID { t.agentService.CorePlugin = cp } - if err != nil { - return regErrors.Wrapf(err, "failed to load plugin - %s", p.ID) - } // For every default plugin for a task type specified in flytepropeller config we validate that the plugin's // static definition includes that task type as something it is registered to handle. for _, tt := range p.RegisteredTaskTypes { diff --git a/flytepropeller/pkg/controller/nodes/task/handler_test.go b/flytepropeller/pkg/controller/nodes/task/handler_test.go index 4e6798cfef..31e1be9a7f 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/task/handler_test.go @@ -126,6 +126,8 @@ func Test_task_Setup(t *testing.T) { k8sPluginDefault := &pluginK8sMocks.Plugin{} k8sPluginDefault.OnGetProperties().Return(pluginK8s.PluginProperties{}) + loadErrorPluginType := "loadError" + corePluginEntry := pluginCore.PluginEntry{ ID: corePluginType, RegisteredTaskTypes: []pluginCore.TaskType{corePluginType}, @@ -154,6 +156,13 @@ func Test_task_Setup(t *testing.T) { RegisteredTaskTypes: []pluginCore.TaskType{k8sPluginDefaultType}, ResourceToWatch: &v1.Pod{}, } + loadErrorPluginEntry := pluginCore.PluginEntry{ + ID: loadErrorPluginType, + RegisteredTaskTypes: []pluginCore.TaskType{loadErrorPluginType}, + LoadPlugin: func(ctx context.Context, iCtx pluginCore.SetupContext) (pluginCore.Plugin, error) { + return nil, fmt.Errorf("test") + }, + } type wantFields struct { pluginIDs map[pluginCore.TaskType]string @@ -232,6 +241,15 @@ func Test_task_Setup(t *testing.T) { }, }, false}, + {"load-error", + testPluginRegistry{ + core: []pluginCore.PluginEntry{loadErrorPluginEntry}, + k8s: []pluginK8s.PluginEntry{}, + }, + []string{loadErrorPluginType}, + map[string]string{corePluginType: loadErrorPluginType}, + wantFields{}, + true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From eae4bf57937cf245177244ff6555dae85d2cfa23 Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Thu, 1 Aug 2024 23:48:45 -0700 Subject: [PATCH 06/29] Log stack trace when refresh cache sync recovers from panic (#5623) Signed-off-by: Bugra Gedik --- flytestdlib/cache/auto_refresh.go | 5 ++-- flytestdlib/cache/auto_refresh_test.go | 37 ++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index bb23ef9369..8218e577a8 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -3,6 +3,7 @@ package cache import ( "context" "fmt" + "runtime/debug" "sync" "time" @@ -290,9 +291,9 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { } if err, isErr = rVal.(error); isErr { - err = fmt.Errorf("worker panic'd and is shutting down. Error: %w", err) + err = fmt.Errorf("worker panic'd and is shutting down. Error: %w with Stack: %v", err, string(debug.Stack())) } else { - err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v", rVal) + err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v with Stack: %v", rVal, string(debug.Stack())) } logger.Error(ctx, err) diff --git a/flytestdlib/cache/auto_refresh_test.go b/flytestdlib/cache/auto_refresh_test.go index e798300f5d..5e1c49777e 100644 --- a/flytestdlib/cache/auto_refresh_test.go +++ b/flytestdlib/cache/auto_refresh_test.go @@ -64,6 +64,15 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error panic("This should never be called") } +type panickingSyncer struct { + callCount atomic.Int32 +} + +func (p *panickingSyncer) sync(_ context.Context, _ Batch) ([]ItemSyncResponse, error) { + p.callCount.Inc() + panic("testing") +} + func TestCacheFour(t *testing.T) { testResyncPeriod := 10 * time.Millisecond rateLimiter := workqueue.DefaultControllerRateLimiter() @@ -172,6 +181,34 @@ func TestCacheFour(t *testing.T) { cancel() }) + + t.Run("Test panic on sync and shutdown", func(t *testing.T) { + syncer := &panickingSyncer{} + cache, err := NewAutoRefreshCache("fake3", syncer.sync, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope()) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, cache.Start(ctx)) + + itemID := "dummy_id" + _, err = cache.GetOrCreate(itemID, fakeCacheItem{ + val: 0, + }) + assert.NoError(t, err) + + // wait for all workers to run + assert.Eventually(t, func() bool { + return syncer.callCount.Load() == int32(10) + }, 5*time.Second, time.Millisecond) + + // wait some more time + time.Sleep(500 * time.Millisecond) + + // all workers should have shut down. + assert.Equal(t, int32(10), syncer.callCount.Load()) + + cancel() + }) } func TestQueueBuildUp(t *testing.T) { From dcbc55a3d7b1b2c81654a03158584fdd10220f2d Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 2 Aug 2024 17:48:36 +0800 Subject: [PATCH 07/29] use private-key (#5626) Signed-off-by: Bugra Gedik --- docs/deployment/agents/snowflake.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/deployment/agents/snowflake.rst b/docs/deployment/agents/snowflake.rst index a689c748bf..d6ee74125b 100644 --- a/docs/deployment/agents/snowflake.rst +++ b/docs/deployment/agents/snowflake.rst @@ -6,7 +6,7 @@ Snowflake agent This guide provides an overview of how to set up the Snowflake agent in your Flyte deployment. 1. Set up the key pair authentication in Snowflake. For more details, see the `Snowflake key-pair authentication and key-pair rotation guide `__. -2. Create a secret with the group "private_key" and the key "snowflake". +2. Create a secret with the group "private-key" and the key "snowflake". This is hardcoded in the flytekit sdk, since we can't know the group and key name in advance. This is for permission to upload and download data with structured dataset in python task pod. From 61cffe82a2a0a2f352046dc194eba6695144ce49 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 2 Aug 2024 17:55:47 +0800 Subject: [PATCH 08/29] Explain how Agent Secret Works (#5625) * first version Signed-off-by: Future-Outlier * update Signed-off-by: Future-Outlier --------- Signed-off-by: Future-Outlier Signed-off-by: Bugra Gedik --- docs/flyte_agents/how_secret_works_in_agent.md | 17 +++++++++++++++++ docs/flyte_agents/index.md | 3 +++ 2 files changed, 20 insertions(+) create mode 100644 docs/flyte_agents/how_secret_works_in_agent.md diff --git a/docs/flyte_agents/how_secret_works_in_agent.md b/docs/flyte_agents/how_secret_works_in_agent.md new file mode 100644 index 0000000000..7abada46ac --- /dev/null +++ b/docs/flyte_agents/how_secret_works_in_agent.md @@ -0,0 +1,17 @@ +--- +jupytext: + formats: md:myst + text_representation: + extension: .md + format_name: myst +--- + +(how_secret_works_in_agent)= +# How Secret Works in Agent + +In Flyte agent's deployment, we mount secrets in Kubernetes with the namespace `flyte` and the name `flyteagent`. +If you want to add secrets for agents, you can use the following command: + +```bash +SECRET_VALUE=$( | base64) && kubectl patch secret flyteagent -n flyte --patch "{\"data\":{\"your_agent_secret_name\":\"$SECRET_VALUE\"}}" +``` diff --git a/docs/flyte_agents/index.md b/docs/flyte_agents/index.md index e7d627a670..a32200cde6 100644 --- a/docs/flyte_agents/index.md +++ b/docs/flyte_agents/index.md @@ -36,6 +36,8 @@ You can create different agent services that host different agents, e.g., a prod - Once you have tested your new agent in a local development cluster and want to use it in production, you should test it in the Flyte sandbox. * - {doc}`Implementing the agent metadata service ` - If you want to develop an agent server in a language other than Python (e.g., Rust or Java), you must implement the agent metadata service in your agent server. +* - {doc}`How secret works in agent ` + - Explain how secret works in your agent server. ``` ```{toctree} @@ -48,4 +50,5 @@ developing_agents testing_agents_in_a_local_development_cluster deploying_agents_to_the_flyte_sandbox implementing_the_agent_metadata_service +how_secret_works_in_agent ``` From 2cf52ef8f169bd38d9b39a37bbc2960db51ea8b0 Mon Sep 17 00:00:00 2001 From: ddl-rliu <140021987+ddl-rliu@users.noreply.github.com> Date: Fri, 2 Aug 2024 03:03:48 -0700 Subject: [PATCH 09/29] Fix typo in execution manager (#5619) Signed-off-by: ddl-rliu <140021987+ddl-rliu@users.noreply.github.com> Signed-off-by: Bugra Gedik --- flyteadmin/pkg/manager/impl/execution_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 337301977e..13521cedbb 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1719,7 +1719,7 @@ func (m *ExecutionManager) TerminateExecution( } if common.IsExecutionTerminal(core.WorkflowExecution_Phase(core.WorkflowExecution_Phase_value[executionModel.Phase])) { - return nil, errors.NewAlreadyInTerminalStateError(ctx, "Cannot abort an already terminate workflow execution", executionModel.Phase) + return nil, errors.NewAlreadyInTerminalStateError(ctx, "Cannot abort an already terminated workflow execution", executionModel.Phase) } err = transformers.SetExecutionAborting(&executionModel, request.Cause, getUser(ctx)) From a65a5903fcfe6522577b1300423f465e3bc632e3 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 2 Aug 2024 12:53:33 -0700 Subject: [PATCH 10/29] Amend Admin to use grpc message size (#5628) * add send arg Signed-off-by: Yee Hing Tong * Add acction to remove cache in gh runner Signed-off-by: Eduardo Apolinario * Use correct checked out path Signed-off-by: Eduardo Apolinario * Path in strings Signed-off-by: Eduardo Apolinario * Checkout repo in root Signed-off-by: Eduardo Apolinario * Use the correct path to new action Signed-off-by: Eduardo Apolinario * Do not use gh var in path to clear-action-cache Signed-off-by: Eduardo Apolinario * Remove wrong invocation of clear-action-cache Signed-off-by: Eduardo Apolinario * GITHUB_WORKSPACE is implicit in the checkout action Signed-off-by: Eduardo Apolinario * Refer to local `flyte` directory Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Yee Hing Tong Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: Bugra Gedik --- .github/actions/clear-action-cache/action.yml | 11 +++++++++++ .github/workflows/tests.yml | 6 ++++-- flyteadmin/pkg/server/service.go | 2 +- 3 files changed, 16 insertions(+), 3 deletions(-) create mode 100644 .github/actions/clear-action-cache/action.yml diff --git a/.github/actions/clear-action-cache/action.yml b/.github/actions/clear-action-cache/action.yml new file mode 100644 index 0000000000..a29347b61c --- /dev/null +++ b/.github/actions/clear-action-cache/action.yml @@ -0,0 +1,11 @@ +name: 'Clear action cache' +description: 'As suggested by GitHub to prevent low disk space: https://github.com/actions/runner-images/issues/2840#issuecomment-790492173' +runs: + using: 'composite' + steps: + - shell: bash + run: | + rm -rf /usr/share/dotnet + rm -rf /opt/ghc + rm -rf "/usr/local/share/boost" + rm -rf "$AGENT_TOOLSDIRECTORY" diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index cbce9cd054..1d69466464 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -30,12 +30,14 @@ jobs: - name: Fetch flyte code uses: actions/checkout@v4 with: - path: "${{ github.workspace }}/flyte" + path: flyte + - name: 'Clear action cache' + uses: ./flyte/.github/actions/clear-action-cache - name: Fetch flytekit code uses: actions/checkout@v4 with: repository: flyteorg/flytekit - path: "${{ github.workspace }}/flytekit" + path: flytekit - uses: conda-incubator/setup-miniconda@v3 with: auto-update-conda: true diff --git a/flyteadmin/pkg/server/service.go b/flyteadmin/pkg/server/service.go index bb09f9f615..0a7371ef68 100644 --- a/flyteadmin/pkg/server/service.go +++ b/flyteadmin/pkg/server/service.go @@ -140,7 +140,7 @@ func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c grpc.UnaryInterceptor(chainedUnaryInterceptors), } if cfg.GrpcConfig.MaxMessageSizeBytes > 0 { - serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(cfg.GrpcConfig.MaxMessageSizeBytes)) + serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(cfg.GrpcConfig.MaxMessageSizeBytes), grpc.MaxSendMsgSize(cfg.GrpcConfig.MaxMessageSizeBytes)) } serverOpts = append(serverOpts, opts...) grpcServer := grpc.NewServer(serverOpts...) From fe9fb6786551d872981fb61b5fb1cb56fa1a3349 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 6 Aug 2024 12:50:06 +0800 Subject: [PATCH 11/29] document the process of setting ttl for a ray cluster (#5636) Signed-off-by: Kevin Su Signed-off-by: Bugra Gedik --- docs/deployment/plugins/k8s/index.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/deployment/plugins/k8s/index.rst b/docs/deployment/plugins/k8s/index.rst index a46ec23815..64fbb41136 100644 --- a/docs/deployment/plugins/k8s/index.rst +++ b/docs/deployment/plugins/k8s/index.rst @@ -272,6 +272,10 @@ Specify plugin configuration - container: container - container_array: k8s-array - ray: ray + plugins: + ray: + // Shutdown Ray cluster after 1 hour of inactivity + ttlSecondsAfterFinished: 3600 .. group-tab:: Flyte core @@ -294,6 +298,10 @@ Specify plugin configuration sidecar: sidecar container_array: k8s-array ray: ray + plugins: + ray: + // Shutdown Ray cluster after 1 hour of inactivity + ttlSecondsAfterFinished: 3600 .. group-tab:: Spark From 051443086b41acfaea7bb7a93d54b5fba8b3b74d Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 7 Aug 2024 16:24:11 -0700 Subject: [PATCH 12/29] Add CustomHeaderMatcher to pass additional headers (#5563) Signed-off-by: Andrew Dye Signed-off-by: Bugra Gedik --- flyteadmin/auth/handlers.go | 20 ++++++++++++++++++++ flyteadmin/pkg/server/service.go | 3 +++ flyteadmin/plugins/registry.go | 9 +++++---- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/flyteadmin/auth/handlers.go b/flyteadmin/auth/handlers.go index a6220db6e3..b839cf26d0 100644 --- a/flyteadmin/auth/handlers.go +++ b/flyteadmin/auth/handlers.go @@ -5,11 +5,13 @@ import ( "encoding/json" "fmt" "net/http" + "net/textproto" "net/url" "strings" "time" "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "golang.org/x/oauth2" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -22,6 +24,7 @@ import ( "github.com/flyteorg/flyte/flyteadmin/pkg/common" "github.com/flyteorg/flyte/flyteadmin/plugins" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" + "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/errors" "github.com/flyteorg/flyte/flytestdlib/logger" ) @@ -32,6 +35,8 @@ const ( FromHTTPVal = "true" ) +var XRequestID = textproto.CanonicalMIMEHeaderKey(contextutils.RequestIDKey.String()) + type PreRedirectHookError struct { Message string Code int @@ -533,3 +538,18 @@ func GetUserInfoForwardResponseHandler() UserInfoForwardResponseHandler { return nil } } + +func GetCustomHeaderMatcher(pluginRegistry *plugins.Registry) runtime.HeaderMatcherFunc { + if fn := plugins.Get[runtime.HeaderMatcherFunc](pluginRegistry, plugins.PluginIDCustomerHeaderMatcher); fn != nil { + return fn + } + return func(key string) (string, bool) { + canonicalKey := textproto.CanonicalMIMEHeaderKey(key) + switch canonicalKey { + case XRequestID: + return canonicalKey, true + default: + return runtime.DefaultHeaderMatcher(key) + } + } +} diff --git a/flyteadmin/pkg/server/service.go b/flyteadmin/pkg/server/service.go index 0a7371ef68..587ea86e3b 100644 --- a/flyteadmin/pkg/server/service.go +++ b/flyteadmin/pkg/server/service.go @@ -240,6 +240,9 @@ func newHTTPServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c // This option sets subject in the user info response gwmuxOptions = append(gwmuxOptions, runtime.WithForwardResponseOption(auth.GetUserInfoForwardResponseHandler())) + // Use custom header matcher to allow additional headers to be passed through + gwmuxOptions = append(gwmuxOptions, runtime.WithIncomingHeaderMatcher(auth.GetCustomHeaderMatcher(pluginRegistry))) + if cfg.Security.UseAuth { // Add HTTP handlers for OIDC endpoints auth.RegisterHandlers(ctx, mux, authCtx, pluginRegistry) diff --git a/flyteadmin/plugins/registry.go b/flyteadmin/plugins/registry.go index 92644b1367..a89a8dfeae 100644 --- a/flyteadmin/plugins/registry.go +++ b/flyteadmin/plugins/registry.go @@ -9,12 +9,13 @@ import ( type PluginID = string const ( - PluginIDWorkflowExecutor PluginID = "WorkflowExecutor" + PluginIDAdditionalGRPCService PluginID = "AdditionalGRPCService" + PluginIDCustomerHeaderMatcher PluginID = "CustomerHeaderMatcher" PluginIDDataProxy PluginID = "DataProxy" - PluginIDUnaryServiceMiddleware PluginID = "UnaryServiceMiddleware" - PluginIDPreRedirectHook PluginID = "PreRedirectHook" PluginIDLogoutHook PluginID = "LogoutHook" - PluginIDAdditionalGRPCService PluginID = "AdditionalGRPCService" + PluginIDPreRedirectHook PluginID = "PreRedirectHook" + PluginIDUnaryServiceMiddleware PluginID = "UnaryServiceMiddleware" + PluginIDWorkflowExecutor PluginID = "WorkflowExecutor" ) type AtomicRegistry struct { From 80c349d45994ced638aba02c82eed3ea6e4d8d71 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Thu, 8 Aug 2024 11:57:03 -0700 Subject: [PATCH 13/29] Turn flyteidl and flytectl releases into manual gh workflows (#5635) * Make flyteidl releases go through a manual gh workflow Signed-off-by: Eduardo Apolinario * Make flytectl releases go through a manual gh workflow Signed-off-by: Eduardo Apolinario * Rewrite the documentation for `version` and clarify wording in RELEASE.md Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: Bugra Gedik --- .github/workflows/create_release.yml | 1 - .github/workflows/flytectl-release.yml | 27 +++++++++++++++++++--- .github/workflows/flyteidl-release.yml | 31 ++++++++++++++++++++++---- flytectl/RELEASE.md | 2 +- flyteidl/RELEASE.md | 4 ++++ 5 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 flyteidl/RELEASE.md diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml index 1db5986925..e00c09f2d7 100644 --- a/.github/workflows/create_release.yml +++ b/.github/workflows/create_release.yml @@ -28,7 +28,6 @@ jobs: "datacatalog", "flyteadmin", "flytecopilot", - "flyteidl", "flyteplugins", "flytepropeller", "flytestdlib", diff --git a/.github/workflows/flytectl-release.yml b/.github/workflows/flytectl-release.yml index 2bfa6f28eb..2aba67dbe9 100644 --- a/.github/workflows/flytectl-release.yml +++ b/.github/workflows/flytectl-release.yml @@ -1,13 +1,34 @@ name: Flytectl release on: - push: - tags: - - flytectl/v*.*.* + workflow_dispatch: + inputs: + version: + description: "version. Do *not* use the `flytectl/` prefix, e.g. `flytectl/v1.2.3`, instead use only `v1.2.3` (including the `v`)" + required: true jobs: + push-flytectl-tag: + name: Push git tag containing the `flyteidl/` prefix + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: '0' + - uses: actions/github-script@v6 + with: + github-token: ${{ secrets.FLYTE_BOT_PAT }} + script: | + github.rest.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: `refs/tags/flytectl/${{ github.event.inputs.version }}`, + sha: context.sha + }) release: name: Goreleaser + needs: + - push-flytectl-tag runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/flyteidl-release.yml b/.github/workflows/flyteidl-release.yml index 94c13645b2..2b19f7942d 100644 --- a/.github/workflows/flyteidl-release.yml +++ b/.github/workflows/flyteidl-release.yml @@ -1,12 +1,33 @@ -name: Upload flyteidl to PyPI and npm +name: Release flyteidl on: - push: - tags: - - flyteidl/v*.*.* + workflow_dispatch: + inputs: + version: + description: "version. Do *not* use the `flyteidl/` prefix, e.g. `flyteidl/v1.2.3`, instead use only `v1.2.3` (including the `v`)" + required: true jobs: + push-flyteidl-tag: + name: Push git tag containing the `flyteidl/` prefix + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: '0' + - uses: actions/github-script@v6 + with: + github-token: ${{ secrets.FLYTE_BOT_PAT }} + script: | + github.rest.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: `refs/tags/flyteidl/${{ github.event.inputs.version }}`, + sha: context.sha + }) deploy-to-pypi: + needs: + - push-flyteidl-tag runs-on: ubuntu-latest defaults: run: @@ -29,6 +50,8 @@ jobs: python -m build twine upload dist/* deploy-to-npm: + needs: + - push-flyteidl-tag runs-on: ubuntu-latest defaults: run: diff --git a/flytectl/RELEASE.md b/flytectl/RELEASE.md index 646f7465a6..414aa24199 100644 --- a/flytectl/RELEASE.md +++ b/flytectl/RELEASE.md @@ -2,4 +2,4 @@ Flytectl releases map to git tags with the prefix `flytectl/` followed by a semver string, e.g. [flytectl/v0.9.0](https://github.com/flyteorg/flyte/releases/tag/flytectl%2Fv0.9.0). -To release a new version of flytectl push a new git tag in the format described above. This will kick off a <[github workflow](https://github.com/flyteorg/flyte/blob/master/.github/workflows/flytectl-release.yml) responsible for releasing this new version. Note how the git tag has to be formatted a certain way for the workflow to run. +To release a new version of flytectl run the <[github workflow](https://github.com/flyteorg/flyte/blob/master/.github/workflows/flytectl-release.yml), which is responsible for releasing this new version. Remember to use valid semver versions, including adding the prefix `v`, e.g. `v1.2.3`. diff --git a/flyteidl/RELEASE.md b/flyteidl/RELEASE.md new file mode 100644 index 0000000000..eaaa4d51f6 --- /dev/null +++ b/flyteidl/RELEASE.md @@ -0,0 +1,4 @@ +# Release Process + +To release a new version of flyteidl run the <[github workflow](https://github.com/flyteorg/flyte/blob/master/.github/workflows/flyteidl-release.yml), which is responsible for releasing this new version. Remember to use valid semver versions, including adding the prefix `v`, e.g. `v1.2.3`. + From ee724b14ad027abea5a264a388bb4cead313a5f7 Mon Sep 17 00:00:00 2001 From: Christina <156356273+cratiu222@users.noreply.github.com> Date: Thu, 8 Aug 2024 22:42:44 +0300 Subject: [PATCH 14/29] docs: fix typo (#5643) * fix CHANGELOG-v0.2.0.md Signed-off-by: Christina <156356273+cratiu222@users.noreply.github.com> * fix CHANGELOG-v1.0.2-b1.md Signed-off-by: Christina <156356273+cratiu222@users.noreply.github.com> * fix CHANGELOG-v1.1.0.md Signed-off-by: Christina <156356273+cratiu222@users.noreply.github.com> * fix CHANGELOG-v1.3.0.md Signed-off-by: Christina <156356273+cratiu222@users.noreply.github.com> --------- Signed-off-by: Christina <156356273+cratiu222@users.noreply.github.com> Signed-off-by: Bugra Gedik --- CHANGELOG/CHANGELOG-v0.2.0.md | 2 +- CHANGELOG/CHANGELOG-v1.0.2-b1.md | 2 +- CHANGELOG/CHANGELOG-v1.1.0.md | 2 +- CHANGELOG/CHANGELOG-v1.3.0.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG/CHANGELOG-v0.2.0.md b/CHANGELOG/CHANGELOG-v0.2.0.md index 4c16bb0742..d3c85ff0fb 100644 --- a/CHANGELOG/CHANGELOG-v0.2.0.md +++ b/CHANGELOG/CHANGELOG-v0.2.0.md @@ -16,7 +16,7 @@ - RawOutputDirectories created in FlytePropeller - Improve visibility and observability - User/System error differentiation -- Optional interruptible tasks (lets use spot instances, reduce cost) +- Optional interruptible tasks (let's use spot instances, to reduce cost) - Caps on queue time for workflows - Multi cluster improvements - Visibility into execution cluster for the execution diff --git a/CHANGELOG/CHANGELOG-v1.0.2-b1.md b/CHANGELOG/CHANGELOG-v1.0.2-b1.md index 3ade2ab25e..ef216a0c25 100644 --- a/CHANGELOG/CHANGELOG-v1.0.2-b1.md +++ b/CHANGELOG/CHANGELOG-v1.0.2-b1.md @@ -5,7 +5,7 @@ 1. [Bugfix](https://github.com/flyteorg/flyte/issues/2444) With GRPC v1.46.0 non-ascii chars are not permitted in grpc metadata 1. [Housekeeping](https://github.com/flyteorg/flyte/issues/1698) Configure grpc_health_prob in admin 1. [Feature](https://github.com/flyteorg/flyte/issues/2329) In Flytectl use Launchplan with latest version for scheduled workflows -1. [Bugfix](https://github.com/flyteorg/flyte/issues/2262) Pods started before InjectFinalizer is disabled are never deleted +1. [Bugfix](https://github.com/flyteorg/flyte/issues/2262) Pods started before InjectFinalizer was disabled are never deleted 1. [Housekeeping](https://github.com/flyteorg/flyte/issues/2504) Checksum grpc_health_probe 1. [Feature](https://github.com/flyteorg/flyte/issues/2284) Allow to choose Spot Instances at workflow start time 1. [Feature](https://github.com/flyteorg/flyte/pull/2439) Use the same pod annotation formatting in syncresources cronjob diff --git a/CHANGELOG/CHANGELOG-v1.1.0.md b/CHANGELOG/CHANGELOG-v1.1.0.md index 9236270965..1cbad29584 100644 --- a/CHANGELOG/CHANGELOG-v1.1.0.md +++ b/CHANGELOG/CHANGELOG-v1.1.0.md @@ -17,7 +17,7 @@ Support for [Optional types](https://github.com/flyteorg/flyte/issues/2426). Wit ### Bug Fixes * [Propeller](https://github.com/flyteorg/flyte/issues/2298) calling finalize rather than abort -* [Propeller](https://github.com/flyteorg/flyte/issues/2404) correctly identify error when requesting a launch plan that does not exist. +* [Propeller](https://github.com/flyteorg/flyte/issues/2404) correctly identifies an error when requesting a launch plan that does not exist. * Better handle [execution CRDs](https://github.com/flyteorg/flyte/issues/2275) that don't exist in Admin. * [Fix panic](https://github.com/flyteorg/flyte/issues/2597) when creating additional label options. * Check [validity](https://github.com/flyteorg/flyte/issues/2601) of notifications. diff --git a/CHANGELOG/CHANGELOG-v1.3.0.md b/CHANGELOG/CHANGELOG-v1.3.0.md index c15224d4a6..0591f676ef 100644 --- a/CHANGELOG/CHANGELOG-v1.3.0.md +++ b/CHANGELOG/CHANGELOG-v1.3.0.md @@ -7,7 +7,7 @@ The main features of this 1.3 release are * Signaling/gate node support (human in the loop tasks) * User documentation support (backend and flytekit only, limited types) -The latter two are pending some work in Flyte console, they will be piped through fully by the end of Q1. Support for setting and approving gate nodes is supported in `FlyteRemote` however, though only a limited set of types can be passed in. +The latter two are pending some work in Flyte console, they will be piped through fully by the end of Q1. Support for setting and approving gate nodes is supported in `FlyteRemote` however, only a limited set of types can be passed in. ## Notes There are a couple things to point out with this release. From a9beb65c340088c2a6be92c6885bc2387470cbf4 Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 8 Aug 2024 15:53:22 -0400 Subject: [PATCH 15/29] Use enable_deck=True in docs (#5645) Signed-off-by: Bugra Gedik --- docs/core_use_cases/analytics.md | 2 +- docs/core_use_cases/machine_learning.md | 2 +- .../visualizing_task_input_and_output.md | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/core_use_cases/analytics.md b/docs/core_use_cases/analytics.md index 886b75618d..71b5530c03 100644 --- a/docs/core_use_cases/analytics.md +++ b/docs/core_use_cases/analytics.md @@ -48,7 +48,7 @@ of the map. In this case, we normalize the `people_vaccinated` by the `population` count of each country: ```{code-cell} ipython3 -@task(disable_deck=False) +@task(enable_deck=True) def plot(df: pd.DataFrame): """Render a Choropleth map.""" df["text"] = df["location"] + "
" + "Last updated on: " + df["date"] diff --git a/docs/core_use_cases/machine_learning.md b/docs/core_use_cases/machine_learning.md index 489b8b05f9..6368b0aa54 100644 --- a/docs/core_use_cases/machine_learning.md +++ b/docs/core_use_cases/machine_learning.md @@ -112,7 +112,7 @@ There are many ways to extend your workloads: {ref}`Kubeflow Pytorch` and {doc}`more <_tags/DistributedComputing>` to do distributed training. * - **🔎 Experiment Tracking** - Auto-capture training logs with the {py:func}`~flytekitplugins.mlflow.mlflow_autolog` - decorator, which can be viewed as Flyte Decks with `@task(disable_decks=False)`. + decorator, which can be viewed as Flyte Decks with `@task(enable_deck=True)`. * - **⏩ Inference Acceleration** - Serialize your models in ONNX format using the {ref}`ONNX plugin `, which supports ScikitLearn, TensorFlow, and PyTorch. diff --git a/docs/flyte_fundamentals/visualizing_task_input_and_output.md b/docs/flyte_fundamentals/visualizing_task_input_and_output.md index 487d1627c9..0390d6cf44 100644 --- a/docs/flyte_fundamentals/visualizing_task_input_and_output.md +++ b/docs/flyte_fundamentals/visualizing_task_input_and_output.md @@ -22,14 +22,14 @@ how to generate an HTML report from some Python object. ## Enabling Flyte decks -To enable Flyte decks, simply set `disable_deck=False` in the `@task` decorator: +To enable Flyte decks, simply set `enable_deck=True` in the `@task` decorator: ```{code-cell} ipython3 import pandas as pd from flytekit import task, workflow -@task(disable_deck=False) +@task(enable_deck=True) def iris_data() -> pd.DataFrame: ... ``` @@ -51,7 +51,7 @@ from typing import Optional from flytekit import task, workflow -@task(disable_deck=False) +@task(enable_deck=True) def iris_data( sample_frac: Optional[float] = None, random_state: Optional[int] = None, @@ -168,7 +168,7 @@ function. In the following example, we extend the `iris_data` task with: import flytekit from flytekitplugins.deck.renderer import MarkdownRenderer, BoxRenderer -@task(disable_deck=False) +@task(enable_deck=True) def iris_data( sample_frac: Optional[float] = None, random_state: Optional[int] = None, @@ -220,7 +220,7 @@ except ImportError: from typing_extensions import Annotated -@task(disable_deck=False) +@task(enable_deck=True) def iris_data( sample_frac: Optional[float] = None, random_state: Optional[int] = None, From 392632d30a6c65ea3309ec412d85bc5cd90072ba Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Thu, 8 Aug 2024 14:44:11 -0700 Subject: [PATCH 16/29] Fix flyteidl release checkout all tags (#5646) * Fetch all tags in flyteidl-release.yml Signed-off-by: Eduardo Apolinario * Fix sed expression for npm job Signed-off-by: Eduardo Apolinario --------- Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: Bugra Gedik --- .github/workflows/flyteidl-release.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/flyteidl-release.yml b/.github/workflows/flyteidl-release.yml index 2b19f7942d..c895beba4b 100644 --- a/.github/workflows/flyteidl-release.yml +++ b/.github/workflows/flyteidl-release.yml @@ -34,6 +34,8 @@ jobs: working-directory: flyteidl steps: - uses: actions/checkout@v4 + with: + fetch-depth: '0' - name: Set up Python uses: actions/setup-python@v1 with: @@ -64,8 +66,8 @@ jobs: registry-url: "https://registry.npmjs.org" - name: Set version in npm package run: | - # from refs/tags/v1.2.3 get 1.2.3 - VERSION=$(echo $GITHUB_REF | sed 's#.*/v##') + # v1.2.3 get 1.2.3 + VERSION=$(echo ${{ inputs.version }} | sed 's#.*v##') VERSION=$VERSION make update_npmversion shell: bash - run: | From ee4783e81fbff669b1419645b7ab20ce08e9fa02 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Thu, 8 Aug 2024 15:57:30 -0700 Subject: [PATCH 17/29] Install pyarrow in sandbox functional tests (#5647) Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario Signed-off-by: Bugra Gedik --- .github/workflows/single-binary.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/single-binary.yml b/.github/workflows/single-binary.yml index 23d438b322..d4cb79f4d5 100644 --- a/.github/workflows/single-binary.yml +++ b/.github/workflows/single-binary.yml @@ -175,7 +175,7 @@ jobs: run: | python -m pip install --upgrade pip pip install uv - uv pip install --system flytekit flytekitplugins-deck-standard flytekitplugins-envd "numpy<2.0.0" + uv pip install --system flytekit flytekitplugins-deck-standard flytekitplugins-envd "numpy<2.0.0" pyarrow uv pip freeze - name: Checkout flytesnacks uses: actions/checkout@v4 From b21d674641cce0c44395b6a22bbda0a9003c0e6d Mon Sep 17 00:00:00 2001 From: desihsu <43691987+desihsu@users.noreply.github.com> Date: Fri, 9 Aug 2024 15:19:30 -0700 Subject: [PATCH 18/29] docs: add documentation for configuring notifications in GCP (#5545) * update Signed-off-by: Desi Hsu * dco Signed-off-by: Desi Hsu * dco Signed-off-by: Desi Hsu * typo Signed-off-by: Desi Hsu --------- Signed-off-by: Desi Hsu Signed-off-by: Bugra Gedik --- .../configuration/notifications.rst | 81 +++++++++++++++++-- 1 file changed, 73 insertions(+), 8 deletions(-) diff --git a/docs/deployment/configuration/notifications.rst b/docs/deployment/configuration/notifications.rst index 2e4a77ac53..fecad482fe 100644 --- a/docs/deployment/configuration/notifications.rst +++ b/docs/deployment/configuration/notifications.rst @@ -1,7 +1,8 @@ .. _deployment-configuration-notifications: +############# Notifications -------------- +############# .. tags:: Infrastructure, Advanced @@ -62,10 +63,10 @@ The ``notifications`` top-level portion of the FlyteAdmin config specifies how t As with schedules, the notifications handling is composed of two parts. One handles enqueuing notifications asynchronously and the second part handles processing pending notifications and actually firing off emails and alerts. -This is only supported for Flyte instances running on AWS. +This is only supported for Flyte instances running on AWS or GCP. -Config -======= +AWS Config +========== To publish notifications, you'll need to set up an `SNS topic `_. @@ -80,9 +81,7 @@ Let's look at the following config section and explain what each value represent .. code-block:: yaml notifications: - # Because AWS is the only cloud back-end supported for executing scheduled - # workflows in this case, only ``"aws"`` is a valid value. By default, the - #no-op executor is used. + # By default, the no-op executor is used. type: "aws" # This specifies which region AWS clients will use when creating SNS and SQS clients. @@ -126,10 +125,76 @@ into `code `__. .. rli:: https://raw.githubusercontent.com/flyteorg/flyteadmin/master/flyteadmin_config.yaml :caption: flyteadmin/flyteadmin_config.yaml :lines: 91-105 + +GCP Config +========== + +You'll need to set up a `Pub/Sub topic `__ to publish notifications to, +and a `Pub/Sub subscriber `__ to consume from that topic +and process notifications. The GCP service account used by FlyteAdmin must also have Pub/Sub publish and subscribe permissions. + +Email service +------------- + +In order to actually publish notifications, you'll need an account with an external email service which will be +used to send notification emails and alerts using email APIs. + +Currently, `SendGrid `__ is the only supported external email service, +and you will need to have a verified SendGrid sender. Create a SendGrid API key with ``Mail Send`` permissions +and save it to a file ``key``. + +Create a K8s secret in FlyteAdmin's cluster with that file: + +.. prompt:: bash $ + + kubectl create secret generic -n flyte --from-file key sendgrid-key + +Mount the secret by adding the following to the ``flyte-core`` values YAML: + +.. code-block:: yaml + + flyteadmin: + additionalVolumes: + - name: sendgrid-key + secret: + secretName: sendgrid-key + items: + - key: key + path: key + additionalVolumeMounts: + - name: sendgrid-key + mountPath: /sendgrid + +Config +------ + +In the ``flyte-core`` values YAML, the top-level ``notifications`` config should be +placed under ``workflow_notifications``. + +.. code-block:: yaml + + workflow_notifications: + enabled: true + config: + notifications: + type: gcp + gcp: + projectId: "{{ YOUR PROJECT ID }}" + publisher: + topicName: "{{ YOUR PUB/SUB TOPIC NAME }}" + processor: + queueName: "{{ YOUR PUB/SUB SUBSCRIBER NAME }}" + emailer: + emailServerConfig: + serviceName: sendgrid + apiKeyFilePath: /sendgrid/key + subject: "Flyte execution \"{{ name }}\" has {{ phase }} in \"{{ project }}\"." + sender: "{{ YOUR SENDGRID SENDER EMAIL }}" + body: View details at https://{{ YOUR FLYTE HOST }}/console/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }} From aff319b223c21b160e4e6a72ac46b93dd5512098 Mon Sep 17 00:00:00 2001 From: ShengYu Date: Mon, 12 Aug 2024 14:12:02 +0800 Subject: [PATCH 19/29] Correct "sucessfile" to "successfile" (#5652) Signed-off-by: Bugra Gedik --- flytecopilot/cmd/sidecar_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytecopilot/cmd/sidecar_test.go b/flytecopilot/cmd/sidecar_test.go index 6d261e2c48..a7cc1c964a 100644 --- a/flytecopilot/cmd/sidecar_test.go +++ b/flytecopilot/cmd/sidecar_test.go @@ -90,7 +90,7 @@ func TestUploadOptions_Upload(t *testing.T) { assert.NoError(t, ioutil.WriteFile(success, []byte("done"), os.ModePerm)) ok, err := containerwatcher.FileExists(success) assert.NoError(t, err) - assert.True(t, ok, "sucessfile not created") + assert.True(t, ok, "successfile not created") assert.NoError(t, uopts.Sidecar(ctx)) v, err := store.Head(ctx, "/output/errors.pb") assert.NoError(t, err) From 492952230f3ffcdaee4889ec8db84e92c7f29152 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 12 Aug 2024 18:10:16 +0200 Subject: [PATCH 20/29] Fix ordering for custom template values in cluster resource controller (#5648) Signed-off-by: Katrina Rogan Signed-off-by: Bugra Gedik --- flyteadmin/pkg/clusterresource/controller.go | 5 ++- .../pkg/clusterresource/controller_test.go | 32 +++++++++++++++++++ .../imagepullsecrets_templatized.yaml | 7 ++++ 3 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 flyteadmin/pkg/clusterresource/testdata/imagepullsecrets_templatized.yaml diff --git a/flyteadmin/pkg/clusterresource/controller.go b/flyteadmin/pkg/clusterresource/controller.go index daad2600e8..6ea1731909 100644 --- a/flyteadmin/pkg/clusterresource/controller.go +++ b/flyteadmin/pkg/clusterresource/controller.go @@ -485,14 +485,13 @@ func (c *controller) createResourceFromTemplate(ctx context.Context, templateDir templateValues[fmt.Sprintf(templateVariableFormat, domainVariable)] = domain.Id var k8sManifest = string(template) - for templateKey, templateValue := range templateValues { + for templateKey, templateValue := range customTemplateValues { k8sManifest = strings.Replace(k8sManifest, templateKey, templateValue, replaceAllInstancesOfString) } // Replace remaining template variables from domain specific defaults. - for templateKey, templateValue := range customTemplateValues { + for templateKey, templateValue := range templateValues { k8sManifest = strings.Replace(k8sManifest, templateKey, templateValue, replaceAllInstancesOfString) } - return k8sManifest, nil } diff --git a/flyteadmin/pkg/clusterresource/controller_test.go b/flyteadmin/pkg/clusterresource/controller_test.go index dc3239cdc2..f6a966d6ef 100644 --- a/flyteadmin/pkg/clusterresource/controller_test.go +++ b/flyteadmin/pkg/clusterresource/controller_test.go @@ -293,6 +293,38 @@ kind: IAMServiceAccount metadata: name: my-project-dev-gsa namespace: my-project-dev +`, + wantErr: false, + }, + { + name: "test create resource from templatized imagepullsecrets.yaml", + args: args{ + ctx: context.Background(), + templateDir: "testdata", + templateFileName: "imagepullsecrets_templatized.yaml", + project: &admin.Project{ + Name: "my-project", + Id: "my-project", + }, + domain: &admin.Domain{ + Id: "dev", + Name: "dev", + }, + namespace: "my-project-dev", + templateValues: templateValuesType{ + "{{ imagePullSecretsName }}": "default", + }, + customTemplateValues: templateValuesType{ + "{{ imagePullSecretsName }}": "custom", + }, + }, + wantK8sManifest: `apiVersion: v1 +kind: ServiceAccount +metadata: + name: default + namespace: my-project-dev +imagePullSecrets: + - name: custom `, wantErr: false, }, diff --git a/flyteadmin/pkg/clusterresource/testdata/imagepullsecrets_templatized.yaml b/flyteadmin/pkg/clusterresource/testdata/imagepullsecrets_templatized.yaml new file mode 100644 index 0000000000..5c9d267382 --- /dev/null +++ b/flyteadmin/pkg/clusterresource/testdata/imagepullsecrets_templatized.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: default + namespace: {{ namespace }} +imagePullSecrets: + - name: {{ imagePullSecretsName }} From 5772d1f878506ecc2442801250c4fef81f2680bc Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 12 Aug 2024 18:57:23 +0200 Subject: [PATCH 21/29] Don't error when attempting to trigger schedules for inactive projects (#5649) * Don't error when attempting to trigger schedules for inactive projects Signed-off-by: Katrina Rogan * regen Signed-off-by: Katrina Rogan --------- Signed-off-by: Katrina Rogan Signed-off-by: Bugra Gedik --- flyteadmin/pkg/errors/errors.go | 12 ++ flyteadmin/pkg/errors/errors_test.go | 12 ++ .../impl/validation/project_validator.go | 3 +- .../scheduler/executor/executor_impl.go | 19 +++ .../scheduler/executor/executor_impl_test.go | 12 ++ .../gen/pb-es/flyteidl/admin/project_pb.ts | 50 +++++++ .../gen/pb-go/flyteidl/admin/project.pb.go | 114 +++++++++++++--- flyteidl/gen/pb-js/flyteidl.d.ts | 58 ++++++++ flyteidl/gen/pb-js/flyteidl.js | 127 ++++++++++++++++++ .../pb_python/flyteidl/admin/project_pb2.py | 4 +- .../pb_python/flyteidl/admin/project_pb2.pyi | 8 ++ flyteidl/gen/pb_rust/flyteidl.admin.rs | 12 ++ flyteidl/protos/flyteidl/admin/project.proto | 12 ++ 13 files changed, 421 insertions(+), 22 deletions(-) diff --git a/flyteadmin/pkg/errors/errors.go b/flyteadmin/pkg/errors/errors.go index 51e5ede579..78727a7305 100644 --- a/flyteadmin/pkg/errors/errors.go +++ b/flyteadmin/pkg/errors/errors.go @@ -202,3 +202,15 @@ func IsDoesNotExistError(err error) bool { adminError, ok := err.(FlyteAdminError) return ok && adminError.Code() == codes.NotFound } + +func NewInactiveProjectError(ctx context.Context, id string) FlyteAdminError { + errMsg := fmt.Sprintf("project [%s] is not active", id) + statusErr, transformationErr := NewFlyteAdminError(codes.InvalidArgument, errMsg).WithDetails(&admin.InactiveProject{ + Id: id, + }) + if transformationErr != nil { + logger.Errorf(ctx, "failed to wrap grpc status in type 'Error': %v", transformationErr) + return NewFlyteAdminErrorf(codes.InvalidArgument, errMsg) + } + return statusErr +} diff --git a/flyteadmin/pkg/errors/errors_test.go b/flyteadmin/pkg/errors/errors_test.go index c126f96d6d..daaa060340 100644 --- a/flyteadmin/pkg/errors/errors_test.go +++ b/flyteadmin/pkg/errors/errors_test.go @@ -310,3 +310,15 @@ func TestIsNotDoesNotExistError(t *testing.T) { func TestIsNotDoesNotExistErrorBecauseOfNoneAdminError(t *testing.T) { assert.False(t, IsDoesNotExistError(errors.New("foo"))) } + +func TestNewInactiveProjectError(t *testing.T) { + err := NewInactiveProjectError(context.TODO(), identifier.GetProject()) + statusErr, ok := status.FromError(err) + + assert.True(t, ok) + + details, ok := statusErr.Details()[0].(*admin.InactiveProject) + + assert.True(t, ok) + assert.Equal(t, identifier.GetProject(), details.Id) +} diff --git a/flyteadmin/pkg/manager/impl/validation/project_validator.go b/flyteadmin/pkg/manager/impl/validation/project_validator.go index 8577c13e2b..8a76ce889d 100644 --- a/flyteadmin/pkg/manager/impl/validation/project_validator.go +++ b/flyteadmin/pkg/manager/impl/validation/project_validator.go @@ -71,8 +71,7 @@ func ValidateProjectAndDomain( projectID, domainID, err) } if *project.State != int32(admin.Project_ACTIVE) { - return errors.NewFlyteAdminErrorf(codes.InvalidArgument, - "project [%s] is not active", projectID) + return errors.NewInactiveProjectError(ctx, projectID) } var validDomain bool domains := config.GetDomainsConfig() diff --git a/flyteadmin/scheduler/executor/executor_impl.go b/flyteadmin/scheduler/executor/executor_impl.go index 30ab7f0677..dffb98e1b6 100644 --- a/flyteadmin/scheduler/executor/executor_impl.go +++ b/flyteadmin/scheduler/executor/executor_impl.go @@ -114,6 +114,10 @@ func (w *executor) Execute(ctx context.Context, scheduledTime time.Time, s model }, func() error { _, execErr := w.adminServiceClient.CreateExecution(context.Background(), executionRequest) + if isInactiveProjectError(execErr) { + logger.Debugf(ctx, "project %+v is inactive, ignoring schedule create failure for %+v", s.Project, s) + return nil + } return execErr }, ) @@ -144,3 +148,18 @@ func getExecutorMetrics(scope promutils.Scope) executorMetrics { "count of successful attempts to fire execution for a schedules"), } } + +func isInactiveProjectError(err error) bool { + statusErr, ok := status.FromError(err) + if !ok { + return false + } + if len(statusErr.Details()) > 0 { + for _, detail := range statusErr.Details() { + if _, ok := detail.(*admin.InactiveProject); ok { + return true + } + } + } + return false +} diff --git a/flyteadmin/scheduler/executor/executor_impl_test.go b/flyteadmin/scheduler/executor/executor_impl_test.go index e864d68d79..fc75367ca9 100644 --- a/flyteadmin/scheduler/executor/executor_impl_test.go +++ b/flyteadmin/scheduler/executor/executor_impl_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/scheduler/repositories/models" @@ -98,3 +99,14 @@ func TestExecutorInactiveSchedule(t *testing.T) { err := executor.Execute(context.Background(), time.Now(), schedule) assert.Nil(t, err) } + +func TestIsInactiveProjectError(t *testing.T) { + statusErr := status.New(codes.InvalidArgument, "foo") + var transformationErr error + statusErr, transformationErr = statusErr.WithDetails(&admin.InactiveProject{ + Id: "project", + }) + assert.NoError(t, transformationErr) + + assert.True(t, isInactiveProjectError(statusErr.Err())) +} diff --git a/flyteidl/gen/pb-es/flyteidl/admin/project_pb.ts b/flyteidl/gen/pb-es/flyteidl/admin/project_pb.ts index 11f2726e08..a6fc913c03 100644 --- a/flyteidl/gen/pb-es/flyteidl/admin/project_pb.ts +++ b/flyteidl/gen/pb-es/flyteidl/admin/project_pb.ts @@ -540,3 +540,53 @@ export class ProjectGetRequest extends Message { } } +/** + * Error returned for inactive projects + * + * @generated from message flyteidl.admin.InactiveProject + */ +export class InactiveProject extends Message { + /** + * Indicates a unique project. + * +required + * + * @generated from field: string id = 1; + */ + id = ""; + + /** + * Optional, org key applied to the resource. + * + * @generated from field: string org = 2; + */ + org = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "flyteidl.admin.InactiveProject"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "id", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "org", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): InactiveProject { + return new InactiveProject().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): InactiveProject { + return new InactiveProject().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): InactiveProject { + return new InactiveProject().fromJsonString(jsonString, options); + } + + static equals(a: InactiveProject | PlainMessage | undefined, b: InactiveProject | PlainMessage | undefined): boolean { + return proto3.util.equals(InactiveProject, a, b); + } +} + diff --git a/flyteidl/gen/pb-go/flyteidl/admin/project.pb.go b/flyteidl/gen/pb-go/flyteidl/admin/project.pb.go index 243f46bf5d..d34451452b 100644 --- a/flyteidl/gen/pb-go/flyteidl/admin/project.pb.go +++ b/flyteidl/gen/pb-go/flyteidl/admin/project.pb.go @@ -661,6 +661,65 @@ func (x *ProjectGetRequest) GetOrg() string { return "" } +// Error returned for inactive projects +type InactiveProject struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Indicates a unique project. + // +required + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // Optional, org key applied to the resource. + Org string `protobuf:"bytes,2,opt,name=org,proto3" json:"org,omitempty"` +} + +func (x *InactiveProject) Reset() { + *x = InactiveProject{} + if protoimpl.UnsafeEnabled { + mi := &file_flyteidl_admin_project_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *InactiveProject) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InactiveProject) ProtoMessage() {} + +func (x *InactiveProject) ProtoReflect() protoreflect.Message { + mi := &file_flyteidl_admin_project_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InactiveProject.ProtoReflect.Descriptor instead. +func (*InactiveProject) Descriptor() ([]byte, []int) { + return file_flyteidl_admin_project_proto_rawDescGZIP(), []int{10} +} + +func (x *InactiveProject) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *InactiveProject) GetOrg() string { + if x != nil { + return x.Org + } + return "" +} + var File_flyteidl_admin_project_proto protoreflect.FileDescriptor var file_flyteidl_admin_project_proto_rawDesc = []byte{ @@ -725,19 +784,23 @@ var file_flyteidl_admin_project_proto_rawDesc = []byte{ 0x73, 0x65, 0x22, 0x35, 0x0a, 0x11, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x6f, 0x72, 0x67, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x72, 0x67, 0x42, 0xb8, 0x01, 0x0a, 0x12, 0x63, 0x6f, - 0x6d, 0x2e, 0x66, 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, - 0x42, 0x0c, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, - 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x66, 0x6c, 0x79, - 0x74, 0x65, 0x6f, 0x72, 0x67, 0x2f, 0x66, 0x6c, 0x79, 0x74, 0x65, 0x2f, 0x66, 0x6c, 0x79, 0x74, - 0x65, 0x69, 0x64, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x62, 0x2d, 0x67, 0x6f, 0x2f, 0x66, - 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, 0x2f, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0xa2, 0x02, 0x03, - 0x46, 0x41, 0x58, 0xaa, 0x02, 0x0e, 0x46, 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, 0x2e, 0x41, - 0x64, 0x6d, 0x69, 0x6e, 0xca, 0x02, 0x0e, 0x46, 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, 0x5c, - 0x41, 0x64, 0x6d, 0x69, 0x6e, 0xe2, 0x02, 0x1a, 0x46, 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, - 0x5c, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0xea, 0x02, 0x0f, 0x46, 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, 0x3a, 0x3a, 0x41, - 0x64, 0x6d, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x72, 0x67, 0x22, 0x33, 0x0a, 0x0f, 0x49, 0x6e, 0x61, + 0x63, 0x74, 0x69, 0x76, 0x65, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x0a, 0x02, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x10, 0x0a, 0x03, + 0x6f, 0x72, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x72, 0x67, 0x42, 0xb8, + 0x01, 0x0a, 0x12, 0x63, 0x6f, 0x6d, 0x2e, 0x66, 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, 0x2e, + 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x42, 0x0c, 0x50, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x66, 0x6c, 0x79, 0x74, 0x65, 0x6f, 0x72, 0x67, 0x2f, 0x66, 0x6c, 0x79, 0x74, 0x65, + 0x2f, 0x66, 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x62, + 0x2d, 0x67, 0x6f, 0x2f, 0x66, 0x6c, 0x79, 0x74, 0x65, 0x69, 0x64, 0x6c, 0x2f, 0x61, 0x64, 0x6d, + 0x69, 0x6e, 0xa2, 0x02, 0x03, 0x46, 0x41, 0x58, 0xaa, 0x02, 0x0e, 0x46, 0x6c, 0x79, 0x74, 0x65, + 0x69, 0x64, 0x6c, 0x2e, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0xca, 0x02, 0x0e, 0x46, 0x6c, 0x79, 0x74, + 0x65, 0x69, 0x64, 0x6c, 0x5c, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0xe2, 0x02, 0x1a, 0x46, 0x6c, 0x79, + 0x74, 0x65, 0x69, 0x64, 0x6c, 0x5c, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x5c, 0x47, 0x50, 0x42, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0f, 0x46, 0x6c, 0x79, 0x74, 0x65, 0x69, + 0x64, 0x6c, 0x3a, 0x3a, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -753,7 +816,7 @@ func file_flyteidl_admin_project_proto_rawDescGZIP() []byte { } var file_flyteidl_admin_project_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_flyteidl_admin_project_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_flyteidl_admin_project_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_flyteidl_admin_project_proto_goTypes = []interface{}{ (Project_ProjectState)(0), // 0: flyteidl.admin.Project.ProjectState (*GetDomainRequest)(nil), // 1: flyteidl.admin.GetDomainRequest @@ -766,16 +829,17 @@ var file_flyteidl_admin_project_proto_goTypes = []interface{}{ (*ProjectRegisterResponse)(nil), // 8: flyteidl.admin.ProjectRegisterResponse (*ProjectUpdateResponse)(nil), // 9: flyteidl.admin.ProjectUpdateResponse (*ProjectGetRequest)(nil), // 10: flyteidl.admin.ProjectGetRequest - (*Labels)(nil), // 11: flyteidl.admin.Labels - (*Sort)(nil), // 12: flyteidl.admin.Sort + (*InactiveProject)(nil), // 11: flyteidl.admin.InactiveProject + (*Labels)(nil), // 12: flyteidl.admin.Labels + (*Sort)(nil), // 13: flyteidl.admin.Sort } var file_flyteidl_admin_project_proto_depIdxs = []int32{ 2, // 0: flyteidl.admin.GetDomainsResponse.domains:type_name -> flyteidl.admin.Domain 2, // 1: flyteidl.admin.Project.domains:type_name -> flyteidl.admin.Domain - 11, // 2: flyteidl.admin.Project.labels:type_name -> flyteidl.admin.Labels + 12, // 2: flyteidl.admin.Project.labels:type_name -> flyteidl.admin.Labels 0, // 3: flyteidl.admin.Project.state:type_name -> flyteidl.admin.Project.ProjectState 4, // 4: flyteidl.admin.Projects.projects:type_name -> flyteidl.admin.Project - 12, // 5: flyteidl.admin.ProjectListRequest.sort_by:type_name -> flyteidl.admin.Sort + 13, // 5: flyteidl.admin.ProjectListRequest.sort_by:type_name -> flyteidl.admin.Sort 4, // 6: flyteidl.admin.ProjectRegisterRequest.project:type_name -> flyteidl.admin.Project 7, // [7:7] is the sub-list for method output_type 7, // [7:7] is the sub-list for method input_type @@ -911,6 +975,18 @@ func file_flyteidl_admin_project_proto_init() { return nil } } + file_flyteidl_admin_project_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*InactiveProject); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -918,7 +994,7 @@ func file_flyteidl_admin_project_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_flyteidl_admin_project_proto_rawDesc, NumEnums: 1, - NumMessages: 10, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/flyteidl/gen/pb-js/flyteidl.d.ts b/flyteidl/gen/pb-js/flyteidl.d.ts index a1d9a34637..ada71f1f09 100644 --- a/flyteidl/gen/pb-js/flyteidl.d.ts +++ b/flyteidl/gen/pb-js/flyteidl.d.ts @@ -18184,6 +18184,64 @@ export namespace flyteidl { public static verify(message: { [k: string]: any }): (string|null); } + /** Properties of an InactiveProject. */ + interface IInactiveProject { + + /** InactiveProject id */ + id?: (string|null); + + /** InactiveProject org */ + org?: (string|null); + } + + /** Represents an InactiveProject. */ + class InactiveProject implements IInactiveProject { + + /** + * Constructs a new InactiveProject. + * @param [properties] Properties to set + */ + constructor(properties?: flyteidl.admin.IInactiveProject); + + /** InactiveProject id. */ + public id: string; + + /** InactiveProject org. */ + public org: string; + + /** + * Creates a new InactiveProject instance using the specified properties. + * @param [properties] Properties to set + * @returns InactiveProject instance + */ + public static create(properties?: flyteidl.admin.IInactiveProject): flyteidl.admin.InactiveProject; + + /** + * Encodes the specified InactiveProject message. Does not implicitly {@link flyteidl.admin.InactiveProject.verify|verify} messages. + * @param message InactiveProject message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encode(message: flyteidl.admin.IInactiveProject, writer?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes an InactiveProject message from the specified reader or buffer. + * @param reader Reader or buffer to decode from + * @param [length] Message length if known beforehand + * @returns InactiveProject + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(reader: ($protobuf.Reader|Uint8Array), length?: number): flyteidl.admin.InactiveProject; + + /** + * Verifies an InactiveProject message. + * @param message Plain object to verify + * @returns `null` if valid, otherwise the reason why it is not + */ + public static verify(message: { [k: string]: any }): (string|null); + } + /** Properties of a ProjectAttributes. */ interface IProjectAttributes { diff --git a/flyteidl/gen/pb-js/flyteidl.js b/flyteidl/gen/pb-js/flyteidl.js index 3402b1bdbb..8f446b4aba 100644 --- a/flyteidl/gen/pb-js/flyteidl.js +++ b/flyteidl/gen/pb-js/flyteidl.js @@ -43952,6 +43952,133 @@ return ProjectGetRequest; })(); + admin.InactiveProject = (function() { + + /** + * Properties of an InactiveProject. + * @memberof flyteidl.admin + * @interface IInactiveProject + * @property {string|null} [id] InactiveProject id + * @property {string|null} [org] InactiveProject org + */ + + /** + * Constructs a new InactiveProject. + * @memberof flyteidl.admin + * @classdesc Represents an InactiveProject. + * @implements IInactiveProject + * @constructor + * @param {flyteidl.admin.IInactiveProject=} [properties] Properties to set + */ + function InactiveProject(properties) { + if (properties) + for (var keys = Object.keys(properties), i = 0; i < keys.length; ++i) + if (properties[keys[i]] != null) + this[keys[i]] = properties[keys[i]]; + } + + /** + * InactiveProject id. + * @member {string} id + * @memberof flyteidl.admin.InactiveProject + * @instance + */ + InactiveProject.prototype.id = ""; + + /** + * InactiveProject org. + * @member {string} org + * @memberof flyteidl.admin.InactiveProject + * @instance + */ + InactiveProject.prototype.org = ""; + + /** + * Creates a new InactiveProject instance using the specified properties. + * @function create + * @memberof flyteidl.admin.InactiveProject + * @static + * @param {flyteidl.admin.IInactiveProject=} [properties] Properties to set + * @returns {flyteidl.admin.InactiveProject} InactiveProject instance + */ + InactiveProject.create = function create(properties) { + return new InactiveProject(properties); + }; + + /** + * Encodes the specified InactiveProject message. Does not implicitly {@link flyteidl.admin.InactiveProject.verify|verify} messages. + * @function encode + * @memberof flyteidl.admin.InactiveProject + * @static + * @param {flyteidl.admin.IInactiveProject} message InactiveProject message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + InactiveProject.encode = function encode(message, writer) { + if (!writer) + writer = $Writer.create(); + if (message.id != null && message.hasOwnProperty("id")) + writer.uint32(/* id 1, wireType 2 =*/10).string(message.id); + if (message.org != null && message.hasOwnProperty("org")) + writer.uint32(/* id 2, wireType 2 =*/18).string(message.org); + return writer; + }; + + /** + * Decodes an InactiveProject message from the specified reader or buffer. + * @function decode + * @memberof flyteidl.admin.InactiveProject + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @param {number} [length] Message length if known beforehand + * @returns {flyteidl.admin.InactiveProject} InactiveProject + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + InactiveProject.decode = function decode(reader, length) { + if (!(reader instanceof $Reader)) + reader = $Reader.create(reader); + var end = length === undefined ? reader.len : reader.pos + length, message = new $root.flyteidl.admin.InactiveProject(); + while (reader.pos < end) { + var tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.id = reader.string(); + break; + case 2: + message.org = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }; + + /** + * Verifies an InactiveProject message. + * @function verify + * @memberof flyteidl.admin.InactiveProject + * @static + * @param {Object.} message Plain object to verify + * @returns {string|null} `null` if valid, otherwise the reason why it is not + */ + InactiveProject.verify = function verify(message) { + if (typeof message !== "object" || message === null) + return "object expected"; + if (message.id != null && message.hasOwnProperty("id")) + if (!$util.isString(message.id)) + return "id: string expected"; + if (message.org != null && message.hasOwnProperty("org")) + if (!$util.isString(message.org)) + return "org: string expected"; + return null; + }; + + return InactiveProject; + })(); + admin.ProjectAttributes = (function() { /** diff --git a/flyteidl/gen/pb_python/flyteidl/admin/project_pb2.py b/flyteidl/gen/pb_python/flyteidl/admin/project_pb2.py index 885ef84716..c04fdb67e1 100644 --- a/flyteidl/gen/pb_python/flyteidl/admin/project_pb2.py +++ b/flyteidl/gen/pb_python/flyteidl/admin/project_pb2.py @@ -14,7 +14,7 @@ from flyteidl.admin import common_pb2 as flyteidl_dot_admin_dot_common__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lyteidl/admin/project.proto\x12\x0e\x66lyteidl.admin\x1a\x1b\x66lyteidl/admin/common.proto\"\x12\n\x10GetDomainRequest\",\n\x06\x44omain\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\"F\n\x12GetDomainsResponse\x12\x30\n\x07\x64omains\x18\x01 \x03(\x0b\x32\x16.flyteidl.admin.DomainR\x07\x64omains\"\xd4\x02\n\x07Project\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x30\n\x07\x64omains\x18\x03 \x03(\x0b\x32\x16.flyteidl.admin.DomainR\x07\x64omains\x12 \n\x0b\x64\x65scription\x18\x04 \x01(\tR\x0b\x64\x65scription\x12.\n\x06labels\x18\x05 \x01(\x0b\x32\x16.flyteidl.admin.LabelsR\x06labels\x12:\n\x05state\x18\x06 \x01(\x0e\x32$.flyteidl.admin.Project.ProjectStateR\x05state\x12\x10\n\x03org\x18\x07 \x01(\tR\x03org\"S\n\x0cProjectState\x12\n\n\x06\x41\x43TIVE\x10\x00\x12\x0c\n\x08\x41RCHIVED\x10\x01\x12\x14\n\x10SYSTEM_GENERATED\x10\x02\x12\x13\n\x0fSYSTEM_ARCHIVED\x10\x03\"U\n\x08Projects\x12\x33\n\x08projects\x18\x01 \x03(\x0b\x32\x17.flyteidl.admin.ProjectR\x08projects\x12\x14\n\x05token\x18\x02 \x01(\tR\x05token\"\x9b\x01\n\x12ProjectListRequest\x12\x14\n\x05limit\x18\x01 \x01(\rR\x05limit\x12\x14\n\x05token\x18\x02 \x01(\tR\x05token\x12\x18\n\x07\x66ilters\x18\x03 \x01(\tR\x07\x66ilters\x12-\n\x07sort_by\x18\x04 \x01(\x0b\x32\x14.flyteidl.admin.SortR\x06sortBy\x12\x10\n\x03org\x18\x05 \x01(\tR\x03org\"K\n\x16ProjectRegisterRequest\x12\x31\n\x07project\x18\x01 \x01(\x0b\x32\x17.flyteidl.admin.ProjectR\x07project\"\x19\n\x17ProjectRegisterResponse\"\x17\n\x15ProjectUpdateResponse\"5\n\x11ProjectGetRequest\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x10\n\x03org\x18\x02 \x01(\tR\x03orgB\xb8\x01\n\x12\x63om.flyteidl.adminB\x0cProjectProtoP\x01Z;github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin\xa2\x02\x03\x46\x41X\xaa\x02\x0e\x46lyteidl.Admin\xca\x02\x0e\x46lyteidl\\Admin\xe2\x02\x1a\x46lyteidl\\Admin\\GPBMetadata\xea\x02\x0f\x46lyteidl::Adminb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lyteidl/admin/project.proto\x12\x0e\x66lyteidl.admin\x1a\x1b\x66lyteidl/admin/common.proto\"\x12\n\x10GetDomainRequest\",\n\x06\x44omain\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\"F\n\x12GetDomainsResponse\x12\x30\n\x07\x64omains\x18\x01 \x03(\x0b\x32\x16.flyteidl.admin.DomainR\x07\x64omains\"\xd4\x02\n\x07Project\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x30\n\x07\x64omains\x18\x03 \x03(\x0b\x32\x16.flyteidl.admin.DomainR\x07\x64omains\x12 \n\x0b\x64\x65scription\x18\x04 \x01(\tR\x0b\x64\x65scription\x12.\n\x06labels\x18\x05 \x01(\x0b\x32\x16.flyteidl.admin.LabelsR\x06labels\x12:\n\x05state\x18\x06 \x01(\x0e\x32$.flyteidl.admin.Project.ProjectStateR\x05state\x12\x10\n\x03org\x18\x07 \x01(\tR\x03org\"S\n\x0cProjectState\x12\n\n\x06\x41\x43TIVE\x10\x00\x12\x0c\n\x08\x41RCHIVED\x10\x01\x12\x14\n\x10SYSTEM_GENERATED\x10\x02\x12\x13\n\x0fSYSTEM_ARCHIVED\x10\x03\"U\n\x08Projects\x12\x33\n\x08projects\x18\x01 \x03(\x0b\x32\x17.flyteidl.admin.ProjectR\x08projects\x12\x14\n\x05token\x18\x02 \x01(\tR\x05token\"\x9b\x01\n\x12ProjectListRequest\x12\x14\n\x05limit\x18\x01 \x01(\rR\x05limit\x12\x14\n\x05token\x18\x02 \x01(\tR\x05token\x12\x18\n\x07\x66ilters\x18\x03 \x01(\tR\x07\x66ilters\x12-\n\x07sort_by\x18\x04 \x01(\x0b\x32\x14.flyteidl.admin.SortR\x06sortBy\x12\x10\n\x03org\x18\x05 \x01(\tR\x03org\"K\n\x16ProjectRegisterRequest\x12\x31\n\x07project\x18\x01 \x01(\x0b\x32\x17.flyteidl.admin.ProjectR\x07project\"\x19\n\x17ProjectRegisterResponse\"\x17\n\x15ProjectUpdateResponse\"5\n\x11ProjectGetRequest\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x10\n\x03org\x18\x02 \x01(\tR\x03org\"3\n\x0fInactiveProject\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x10\n\x03org\x18\x02 \x01(\tR\x03orgB\xb8\x01\n\x12\x63om.flyteidl.adminB\x0cProjectProtoP\x01Z;github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin\xa2\x02\x03\x46\x41X\xaa\x02\x0e\x46lyteidl.Admin\xca\x02\x0e\x46lyteidl\\Admin\xe2\x02\x1a\x46lyteidl\\Admin\\GPBMetadata\xea\x02\x0f\x46lyteidl::Adminb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -45,4 +45,6 @@ _globals['_PROJECTUPDATERESPONSE']._serialized_end=930 _globals['_PROJECTGETREQUEST']._serialized_start=932 _globals['_PROJECTGETREQUEST']._serialized_end=985 + _globals['_INACTIVEPROJECT']._serialized_start=987 + _globals['_INACTIVEPROJECT']._serialized_end=1038 # @@protoc_insertion_point(module_scope) diff --git a/flyteidl/gen/pb_python/flyteidl/admin/project_pb2.pyi b/flyteidl/gen/pb_python/flyteidl/admin/project_pb2.pyi index 12750a8959..c775c5aac8 100644 --- a/flyteidl/gen/pb_python/flyteidl/admin/project_pb2.pyi +++ b/flyteidl/gen/pb_python/flyteidl/admin/project_pb2.pyi @@ -96,3 +96,11 @@ class ProjectGetRequest(_message.Message): id: str org: str def __init__(self, id: _Optional[str] = ..., org: _Optional[str] = ...) -> None: ... + +class InactiveProject(_message.Message): + __slots__ = ["id", "org"] + ID_FIELD_NUMBER: _ClassVar[int] + ORG_FIELD_NUMBER: _ClassVar[int] + id: str + org: str + def __init__(self, id: _Optional[str] = ..., org: _Optional[str] = ...) -> None: ... diff --git a/flyteidl/gen/pb_rust/flyteidl.admin.rs b/flyteidl/gen/pb_rust/flyteidl.admin.rs index dcbf3b5df7..ca3270264b 100644 --- a/flyteidl/gen/pb_rust/flyteidl.admin.rs +++ b/flyteidl/gen/pb_rust/flyteidl.admin.rs @@ -2615,6 +2615,18 @@ pub struct ProjectGetRequest { #[prost(string, tag="2")] pub org: ::prost::alloc::string::String, } +/// Error returned for inactive projects +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InactiveProject { + /// Indicates a unique project. + /// +required + #[prost(string, tag="1")] + pub id: ::prost::alloc::string::String, + /// Optional, org key applied to the resource. + #[prost(string, tag="2")] + pub org: ::prost::alloc::string::String, +} /// Defines a set of custom matching attributes at the project level. /// For more info on matchable attributes, see :ref:`ref_flyteidl.admin.MatchableAttributesConfiguration` #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/flyteidl/protos/flyteidl/admin/project.proto b/flyteidl/protos/flyteidl/admin/project.proto index bbaccd70ff..8b994b7267 100644 --- a/flyteidl/protos/flyteidl/admin/project.proto +++ b/flyteidl/protos/flyteidl/admin/project.proto @@ -118,3 +118,15 @@ message ProjectGetRequest { // Optional, org key applied to the resource. string org = 2; } + + +// Error returned for inactive projects +message InactiveProject { + // Indicates a unique project. + // +required + string id = 1; + + // Optional, org key applied to the resource. + string org = 2; +} + From 2ddb4d28252d1f218a820b1d436e4a46effd3534 Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Thu, 15 Aug 2024 19:40:26 +0000 Subject: [PATCH 22/29] fix tests Signed-off-by: Bugra Gedik --- flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go index 7ea6c42be2..d657d4c273 100644 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go @@ -853,7 +853,7 @@ func TestBuildResourcePodTemplate(t *testing.T) { assert.Equal(t, defaultConfig.DefaultEnvVars["foo"], findEnvVarByName(sparkApp.Spec.Driver.Env, "foo").Value) assert.Equal(t, defaultConfig.DefaultEnvVars["fooEnv"], findEnvVarByName(sparkApp.Spec.Driver.Env, "fooEnv").Value) assert.Equal(t, findEnvVarByName(dummyEnvVarsWithSecretRef, "SECRET"), findEnvVarByName(sparkApp.Spec.Driver.Env, "SECRET")) - assert.Equal(t, 9, len(sparkApp.Spec.Driver.Env)) + assert.Equal(t, 10, len(sparkApp.Spec.Driver.Env)) assert.Equal(t, testImage, *sparkApp.Spec.Driver.Image) assert.Equal(t, flytek8s.GetServiceAccountNameFromTaskExecutionMetadata(taskCtx.TaskExecutionMetadata()), *sparkApp.Spec.Driver.ServiceAccount) assert.Equal(t, defaultConfig.DefaultPodSecurityContext, sparkApp.Spec.Driver.SecurityContenxt) @@ -890,7 +890,7 @@ func TestBuildResourcePodTemplate(t *testing.T) { assert.Equal(t, defaultConfig.DefaultEnvVars["foo"], findEnvVarByName(sparkApp.Spec.Executor.Env, "foo").Value) assert.Equal(t, defaultConfig.DefaultEnvVars["fooEnv"], findEnvVarByName(sparkApp.Spec.Executor.Env, "fooEnv").Value) assert.Equal(t, findEnvVarByName(dummyEnvVarsWithSecretRef, "SECRET"), findEnvVarByName(sparkApp.Spec.Executor.Env, "SECRET")) - assert.Equal(t, 9, len(sparkApp.Spec.Executor.Env)) + assert.Equal(t, 10, len(sparkApp.Spec.Executor.Env)) assert.Equal(t, testImage, *sparkApp.Spec.Executor.Image) assert.Equal(t, defaultConfig.DefaultPodSecurityContext, sparkApp.Spec.Executor.SecurityContenxt) assert.Equal(t, defaultConfig.DefaultPodDNSConfig, sparkApp.Spec.Executor.DNSConfig) From 1b24ca2c73cc9932bdc37a5da150517759c3d17c Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Tue, 20 Aug 2024 00:20:50 +0000 Subject: [PATCH 23/29] change to shorter names Signed-off-by: Bugra Gedik --- .../go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go index b0025fdddf..a280106ff4 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go @@ -61,7 +61,7 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1 Value: nodeExecutionID.Domain, }, { - Name: "FLYTE_INTERNAL_POD_NAME", + Name: "_F_PN", ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ FieldPath: "metadata.name", From ede753656ca15adb26e6224dde437e978fcd3d53 Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Tue, 20 Aug 2024 00:47:17 +0000 Subject: [PATCH 24/29] change to shorter names Signed-off-by: Bugra Gedik --- .../pluginmachinery/flytek8s/k8s_resource_adds.go | 1 + flytestdlib/storage/storage.go | 12 ++++++++++-- flytestdlib/storage/stow_store.go | 14 ++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go index a280106ff4..5d145123de 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go @@ -61,6 +61,7 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1 Value: nodeExecutionID.Domain, }, { + # FLYTE_INTERNAL_POD_NAME Name: "_F_PN", ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index 3e84cb7acb..3706d97b8c 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -40,8 +40,13 @@ type Metadata interface { ContentMD5() string } -// DataStore is a simplified interface for accessing and storing data in one of the Cloud stores. -// Today we rely on Stow for multi-cloud support, but this interface abstracts that part +type Cursor interface { + IsStartCursor() bool + IsEndCursor() bool + MoveToStart() + MoveToEnd() +} + type DataStore struct { ComposedProtobufStore ReferenceConstructor @@ -78,6 +83,9 @@ type RawStore interface { // Head gets metadata about the reference. This should generally be a light weight operation. Head(ctx context.Context, reference DataReference) (Metadata, error) + // List gets a list of items given a prefix, using a paginated API + List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]Metadata, Cursor, error) + // ReadRaw retrieves a byte array from the Blob store or an error ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index ce4a75a0a1..1507c24bd6 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -126,6 +126,16 @@ func (s StowMetadata) ContentMD5() string { return s.contentMD5 } +type StowCursor struct { + value string +} + +func (s StowCursor) IsStartCursor() bool + +IsEndCursor() bool +MoveToStart() +MoveToEnd() + // Implements DataStore to talk to stow location store. type StowStore struct { copyImpl @@ -251,6 +261,10 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata return StowMetadata{exists: false}, errs.Wrapf(err, "path:%v", k) } +func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]Metadata, Cursor, error) { + // TODO +} + func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { _, c, k, err := reference.Split() if err != nil { From 7a146490f3e2f122cc0cc2e1e8961c2dd43418d9 Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Tue, 20 Aug 2024 05:21:04 +0000 Subject: [PATCH 25/29] change to shorter names Signed-off-by: Bugra Gedik --- flytestdlib/storage/storage.go | 10 ---------- flytestdlib/storage/stow_store.go | 10 ---------- 2 files changed, 20 deletions(-) diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index 3706d97b8c..f80dd49ab7 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -40,13 +40,6 @@ type Metadata interface { ContentMD5() string } -type Cursor interface { - IsStartCursor() bool - IsEndCursor() bool - MoveToStart() - MoveToEnd() -} - type DataStore struct { ComposedProtobufStore ReferenceConstructor @@ -83,9 +76,6 @@ type RawStore interface { // Head gets metadata about the reference. This should generally be a light weight operation. Head(ctx context.Context, reference DataReference) (Metadata, error) - // List gets a list of items given a prefix, using a paginated API - List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]Metadata, Cursor, error) - // ReadRaw retrieves a byte array from the Blob store or an error ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index 1507c24bd6..219a566556 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -126,16 +126,6 @@ func (s StowMetadata) ContentMD5() string { return s.contentMD5 } -type StowCursor struct { - value string -} - -func (s StowCursor) IsStartCursor() bool - -IsEndCursor() bool -MoveToStart() -MoveToEnd() - // Implements DataStore to talk to stow location store. type StowStore struct { copyImpl From 783d75eae1968542baa2c00133d745177bc7863e Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Tue, 20 Aug 2024 05:21:50 +0000 Subject: [PATCH 26/29] change to shorter names Signed-off-by: Bugra Gedik --- flytestdlib/storage/storage.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index f80dd49ab7..3e84cb7acb 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -40,6 +40,8 @@ type Metadata interface { ContentMD5() string } +// DataStore is a simplified interface for accessing and storing data in one of the Cloud stores. +// Today we rely on Stow for multi-cloud support, but this interface abstracts that part type DataStore struct { ComposedProtobufStore ReferenceConstructor From c89950fe9fcb8ef791ef277ae92e02c99b4ac013 Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Tue, 20 Aug 2024 05:22:16 +0000 Subject: [PATCH 27/29] change to shorter names Signed-off-by: Bugra Gedik --- flytestdlib/storage/stow_store.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index 219a566556..ce4a75a0a1 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -251,10 +251,6 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata return StowMetadata{exists: false}, errs.Wrapf(err, "path:%v", k) } -func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]Metadata, Cursor, error) { - // TODO -} - func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { _, c, k, err := reference.Split() if err != nil { From 63a1faaba0b9b0dc642938bdad484d1fe412a630 Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario Date: Tue, 20 Aug 2024 10:53:33 -0400 Subject: [PATCH 28/29] Fix comment symbol Signed-off-by: Eduardo Apolinario --- .../go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go index 5d145123de..b77615120a 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go @@ -61,7 +61,7 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1 Value: nodeExecutionID.Domain, }, { - # FLYTE_INTERNAL_POD_NAME + // FLYTE_INTERNAL_POD_NAME Name: "_F_PN", ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ From 52a6b7706815475660253c8c989fc82c36338377 Mon Sep 17 00:00:00 2001 From: Bugra Gedik Date: Tue, 20 Aug 2024 17:47:37 +0000 Subject: [PATCH 29/29] fix one more test Signed-off-by: Bugra Gedik --- .../go/tasks/pluginmachinery/flytek8s/container_helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go index 4e609c72b2..3b7aa88aeb 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go @@ -525,7 +525,7 @@ func TestAddFlyteCustomizationsToContainer(t *testing.T) { assert.EqualValues(t, container.Command, []string{"s3://input/path"}) assert.Len(t, container.Resources.Limits, 3) assert.Len(t, container.Resources.Requests, 3) - assert.Len(t, container.Env, 12) + assert.Len(t, container.Env, 13) } func TestAddFlyteCustomizationsToContainer_Resources(t *testing.T) {