Skip to content

Commit

Permalink
Fixed k8secret entrypoint script and added Status info get execution …
Browse files Browse the repository at this point in the history
…response (flyteorg#322)

* Hot fix gorm issue

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Removed the bigint type conversion on ID column and instead using only during migration

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Refactored and added rollback

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Added mocked unit tests for migration changes

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Added stateOccuredAt time to model and UpdateExecution impl

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Fixed k8secret entrypoint script and added Status info get execution response

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Added ExecutionStateChangeDetails impl changes

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Updated get and update execution for state changes

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* update execution for state changes

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* linter fixes

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* added coverage

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* feedback

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* feedback

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* feedback

Signed-off-by: Prafulla Mahindrakar <[email protected]>
  • Loading branch information
pmahindrakar-oss authored Jan 21, 2022
1 parent a6932ce commit 53594c4
Show file tree
Hide file tree
Showing 14 changed files with 488 additions and 17 deletions.
13 changes: 12 additions & 1 deletion flyteadmin/cmd/entrypoints/k8s_secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flyteadmin/pkg/config"
executioncluster "github.com/flyteorg/flyteadmin/pkg/executioncluster/impl"
"github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flytestdlib/errors"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -99,7 +102,15 @@ func persistSecrets(ctx context.Context, _ *pflag.FlagSet) error {
initializationErrorCounter := scope.NewSubScope("secrets").MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config")
listTargetsProvider, err := executioncluster.NewListTargets(initializationErrorCounter, executioncluster.NewExecutionTargetProvider(), configuration.ClusterConfiguration())

var listTargetsProvider interfaces.ListTargetsInterface
var err error
if len(configuration.ClusterConfiguration().GetClusterConfigs()) == 0 {
serverConfig := config.GetConfig()
listTargetsProvider, err = executioncluster.NewInCluster(initializationErrorCounter, serverConfig.KubeConfig, serverConfig.Master)
} else {
listTargetsProvider, err = executioncluster.NewListTargets(initializationErrorCounter, executioncluster.NewExecutionTargetProvider(), configuration.ClusterConfiguration())
}
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/benbjohnson/clock v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.21.18
github.com/flyteorg/flyteidl v0.21.24
github.com/flyteorg/flyteplugins v0.9.1
github.com/flyteorg/flytepropeller v0.16.14
github.com/flyteorg/flytestdlib v0.4.7
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.18 h1:tOrb8U96mJPbiYFDGgoafn/XO2EAWK3U6JWzPIlrKO4=
github.com/flyteorg/flyteidl v0.21.18/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.24 h1:e2wPBK4aiLE+fw2zmhUDNg39QoJk6Lf5lQRvj8XgtFk=
github.com/flyteorg/flyteidl v0.21.24/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.9.1 h1:Z0gxSvG7LeI+COfEmuzkhz9RnJ4E5wWUcjj5qh1uKuw=
github.com/flyteorg/flyteplugins v0.9.1/go.mod h1:OEGQztPFDJG4DV7tS9lDsRRd521iUINn5dcsBf6bW5k=
github.com/flyteorg/flytepropeller v0.16.14 h1:zG+UnfZLPCQdwh7ORm3BNwXlb6Sp2Wwa7I7NnZYcPDw=
Expand Down
49 changes: 49 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,31 @@ func (m *ExecutionManager) GetExecution(
return execution, nil
}

func (m *ExecutionManager) UpdateExecution(ctx context.Context, request admin.ExecutionUpdateRequest,
requestedAt time.Time) (*admin.ExecutionUpdateResponse, error) {
if err := validation.ValidateWorkflowExecutionIdentifier(request.Id); err != nil {
logger.Debugf(ctx, "UpdateExecution request [%+v] failed validation with err: %v", request, err)
return nil, err
}
ctx = getExecutionContext(ctx, request.Id)
executionModel, err := util.GetExecutionModel(ctx, m.db, *request.Id)
if err != nil {
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}

if err = transformers.UpdateExecutionModelStateChangeDetails(executionModel, request.State, requestedAt,
getUser(ctx)); err != nil {
return nil, err
}

if err := m.db.ExecutionRepo().Update(ctx, *executionModel); err != nil {
return nil, err
}

return &admin.ExecutionUpdateResponse{}, nil
}

func (m *ExecutionManager) GetExecutionData(
ctx context.Context, request admin.WorkflowExecutionGetDataRequest) (*admin.WorkflowExecutionGetDataResponse, error) {
ctx = getExecutionContext(ctx, request.Id)
Expand Down Expand Up @@ -1358,6 +1383,11 @@ func (m *ExecutionManager) ListExecutions(
joinTableEntities[filter.GetEntity()] = true
}

// Check if state filter exists and if not then add filter to fetch only ACTIVE executions
if filters, err = addStateFilter(filters); err != nil {
return nil, err
}

listExecutionsInput := repositoryInterfaces.ListResourceInput{
Limit: int(request.Limit),
Offset: offset,
Expand Down Expand Up @@ -1587,3 +1617,22 @@ func (m *ExecutionManager) addProjectLabels(ctx context.Context, projectName str
}
return initialLabels, nil
}

func addStateFilter(filters []common.InlineFilter) ([]common.InlineFilter, error) {
var stateFilterExists bool
for _, inlineFilter := range filters {
if inlineFilter.GetField() == shared.State {
stateFilterExists = true
}
}

if !stateFilterExists {
stateFilter, err := common.NewSingleValueFilter(common.Execution, common.Equal, shared.State,
admin.ExecutionState_EXECUTION_ACTIVE)
if err != nil {
return filters, err
}
filters = append(filters, stateFilter)
}
return filters, nil
}
152 changes: 152 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ var specBytes, _ = proto.Marshal(spec)
var phase = core.WorkflowExecution_RUNNING.String()
var closure = admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: testutils.MockCreatedAtProto,
},
}
var closureBytes, _ = proto.Marshal(&closure)

Expand Down Expand Up @@ -106,6 +110,10 @@ func getLegacyClosure() *admin.ExecutionClosure {
return &admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
ComputedInputs: getLegacySpec().Inputs,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: testutils.MockCreatedAtProto,
},
}
}

Expand Down Expand Up @@ -1491,6 +1499,10 @@ func TestCreateWorkflowEvent_StartedRunning(t *testing.T) {
Phase: core.WorkflowExecution_RUNNING,
StartedAt: occurredAtProto,
UpdatedAt: occurredAtProto,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
State: admin.ExecutionState_EXECUTION_ACTIVE,
OccurredAt: testutils.MockCreatedAtProto,
},
}
closureBytes, _ := proto.Marshal(&closure)
updateExecutionFunc := func(
Expand Down Expand Up @@ -1746,6 +1758,9 @@ func TestGetExecution(t *testing.T) {
assert.Equal(t, "domain", input.Domain)
assert.Equal(t, "name", input.Name)
return models.Execution{
BaseModel: models.BaseModel{
CreatedAt: testutils.MockCreatedAtValue,
},
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Expand Down Expand Up @@ -1820,6 +1835,89 @@ func TestGetExecution_TransformerError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(flyteAdminErrors.FlyteAdminError).Code())
}

func TestUpdateExecution(t *testing.T) {
t.Run("invalid execution identifier", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
_, err := execManager.UpdateExecution(context.Background(), admin.ExecutionUpdateRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
},
}, time.Now())
assert.Error(t, err)
})

t.Run("empty status passed", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
updateExecFuncCalled := false
updateExecFunc := func(ctx context.Context, execModel models.Execution) error {
stateInt := int32(admin.ExecutionState_EXECUTION_ACTIVE)
assert.Equal(t, stateInt, *execModel.State)
updateExecFuncCalled = true
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
updateResponse, err := execManager.UpdateExecution(context.Background(), admin.ExecutionUpdateRequest{
Id: &executionIdentifier,
}, time.Now())
assert.NoError(t, err)
assert.NotNil(t, updateResponse)
assert.True(t, updateExecFuncCalled)
})

t.Run("archive status passed", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
updateExecFuncCalled := false
updateExecFunc := func(ctx context.Context, execModel models.Execution) error {
stateInt := int32(admin.ExecutionState_EXECUTION_ARCHIVED)
assert.Equal(t, stateInt, *execModel.State)
updateExecFuncCalled = true
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
updateResponse, err := execManager.UpdateExecution(context.Background(), admin.ExecutionUpdateRequest{
Id: &executionIdentifier,
State: admin.ExecutionState_EXECUTION_ARCHIVED,
}, time.Now())
assert.NoError(t, err)
assert.NotNil(t, updateResponse)
assert.True(t, updateExecFuncCalled)
})

t.Run("update error", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
updateExecFunc := func(ctx context.Context, execModel models.Execution) error {
return fmt.Errorf("some db error")
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
_, err := execManager.UpdateExecution(context.Background(), admin.ExecutionUpdateRequest{
Id: &executionIdentifier,
State: admin.ExecutionState_EXECUTION_ARCHIVED,
}, time.Now())
assert.Error(t, err)
assert.Equal(t, "some db error", err.Error())
})

t.Run("get execution error", func(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
getExecFunc := func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
return models.Execution{}, fmt.Errorf("some db error")
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(getExecFunc)
execManager := NewExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
_, err := execManager.UpdateExecution(context.Background(), admin.ExecutionUpdateRequest{
Id: &executionIdentifier,
State: admin.ExecutionState_EXECUTION_ARCHIVED,
}, time.Now())
assert.Error(t, err)
assert.Equal(t, "some db error", err.Error())
})
}

func TestListExecutions(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
executionListFunc := func(
Expand Down Expand Up @@ -1850,6 +1948,9 @@ func TestListExecutions(t *testing.T) {
return interfaces.ExecutionCollectionOutput{
Executions: []models.Execution{
{
BaseModel: models.BaseModel{
CreatedAt: testutils.MockCreatedAtValue,
},
ExecutionKey: models.ExecutionKey{
Project: projectValue,
Domain: domainValue,
Expand All @@ -1859,6 +1960,9 @@ func TestListExecutions(t *testing.T) {
Closure: closureBytes,
},
{
BaseModel: models.BaseModel{
CreatedAt: testutils.MockCreatedAtValue,
},
ExecutionKey: models.ExecutionKey{
Project: projectValue,
Domain: domainValue,
Expand Down Expand Up @@ -2538,6 +2642,9 @@ func TestGetExecution_Legacy(t *testing.T) {
assert.Equal(t, "domain", input.Domain)
assert.Equal(t, "name", input.Name)
return models.Execution{
BaseModel: models.BaseModel{
CreatedAt: testutils.MockCreatedAtValue,
},
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Expand Down Expand Up @@ -2577,6 +2684,9 @@ func TestGetExecutionData_LegacyModel(t *testing.T) {

executionGetFunc := func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) {
return models.Execution{
BaseModel: models.BaseModel{
CreatedAt: testutils.MockCreatedAtValue,
},
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Expand Down Expand Up @@ -2770,6 +2880,9 @@ func TestListExecutions_LegacyModel(t *testing.T) {
return interfaces.ExecutionCollectionOutput{
Executions: []models.Execution{
{
BaseModel: models.BaseModel{
CreatedAt: testutils.MockCreatedAtValue,
},
ExecutionKey: models.ExecutionKey{
Project: projectValue,
Domain: domainValue,
Expand All @@ -2779,6 +2892,9 @@ func TestListExecutions_LegacyModel(t *testing.T) {
Closure: getLegacyClosureBytes(),
},
{
BaseModel: models.BaseModel{
CreatedAt: testutils.MockCreatedAtValue,
},
ExecutionKey: models.ExecutionKey{
Project: projectValue,
Domain: domainValue,
Expand Down Expand Up @@ -3749,3 +3865,39 @@ func TestFromAdminProtoTaskResourceSpec(t *testing.T) {
GPU: resource.MustParse("2"),
}, taskResourceSet)
}

func TestAddStateFilter(t *testing.T) {
t.Run("empty filters", func(t *testing.T) {
var filters []common.InlineFilter
updatedFilters, err := addStateFilter(filters)
assert.Nil(t, err)
assert.NotNil(t, updatedFilters)
assert.Equal(t, 1, len(updatedFilters))

assert.Equal(t, shared.State, updatedFilters[0].GetField())
assert.Equal(t, common.Execution, updatedFilters[0].GetEntity())

expression, err := updatedFilters[0].GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "state = ?", expression.Query)
})

t.Run("passed state filter", func(t *testing.T) {
filter, err := common.NewSingleValueFilter(common.Execution, common.NotEqual, "state", "0")
assert.NoError(t, err)
filters := []common.InlineFilter{filter}

updatedFilters, err := addStateFilter(filters)
assert.Nil(t, err)
assert.NotNil(t, updatedFilters)
assert.Equal(t, 1, len(updatedFilters))

assert.Equal(t, shared.State, updatedFilters[0].GetField())
assert.Equal(t, common.Execution, updatedFilters[0].GetEntity())

expression, err := updatedFilters[0].GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "state <> ?", expression.Query)
})

}
2 changes: 2 additions & 0 deletions flyteadmin/pkg/manager/interfaces/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type ExecutionInterface interface {
CreateWorkflowEvent(ctx context.Context, request admin.WorkflowExecutionEventRequest) (
*admin.WorkflowExecutionEventResponse, error)
GetExecution(ctx context.Context, request admin.WorkflowExecutionGetRequest) (*admin.Execution, error)
UpdateExecution(ctx context.Context, request admin.ExecutionUpdateRequest, requestedAt time.Time) (
*admin.ExecutionUpdateResponse, error)
GetExecutionData(ctx context.Context, request admin.WorkflowExecutionGetDataRequest) (
*admin.WorkflowExecutionGetDataResponse, error)
ListExecutions(ctx context.Context, request admin.ResourceListRequest) (*admin.ExecutionList, error)
Expand Down
15 changes: 15 additions & 0 deletions flyteadmin/pkg/manager/mocks/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type RecoverExecutionFunc func(ctx context.Context, request admin.ExecutionRecov
type CreateExecutionEventFunc func(ctx context.Context, request admin.WorkflowExecutionEventRequest) (
*admin.WorkflowExecutionEventResponse, error)
type GetExecutionFunc func(ctx context.Context, request admin.WorkflowExecutionGetRequest) (*admin.Execution, error)
type UpdateExecutionFunc func(ctx context.Context, request admin.ExecutionUpdateRequest, requestedAt time.Time) (
*admin.ExecutionUpdateResponse, error)
type GetExecutionDataFunc func(ctx context.Context, request admin.WorkflowExecutionGetDataRequest) (
*admin.WorkflowExecutionGetDataResponse, error)
type ListExecutionFunc func(ctx context.Context, request admin.ResourceListRequest) (*admin.ExecutionList, error)
Expand All @@ -30,6 +32,7 @@ type MockExecutionManager struct {
RecoverExecutionFunc RecoverExecutionFunc
createExecutionEventFunc CreateExecutionEventFunc
getExecutionFunc GetExecutionFunc
updateExecutionFunc UpdateExecutionFunc
getExecutionDataFunc GetExecutionDataFunc
listExecutionFunc ListExecutionFunc
terminateExecutionFunc TerminateExecutionFunc
Expand Down Expand Up @@ -82,6 +85,18 @@ func (m *MockExecutionManager) CreateWorkflowEvent(
return nil, nil
}

func (m *MockExecutionManager) SetUpdateExecutionCallback(updateExecutionFunc UpdateExecutionFunc) {
m.updateExecutionFunc = updateExecutionFunc
}

func (m *MockExecutionManager) UpdateExecution(ctx context.Context, request admin.ExecutionUpdateRequest,
requestedAt time.Time) (*admin.ExecutionUpdateResponse, error) {
if m.updateExecutionFunc != nil {
return m.updateExecutionFunc(ctx, request, requestedAt)
}
return nil, nil
}

func (m *MockExecutionManager) SetGetCallback(getExecutionFunc GetExecutionFunc) {
m.getExecutionFunc = getExecutionFunc
}
Expand Down
Loading

0 comments on commit 53594c4

Please sign in to comment.