Skip to content

Commit

Permalink
Fix list executions for single task (flyteorg#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Oct 12, 2020
1 parent 7945565 commit f305d39
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 16 deletions.
13 changes: 9 additions & 4 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,11 +1153,16 @@ func (m *ExecutionManager) ListExecutions(
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid pagination token %s for ListExecutions",
request.Token)
}
joinTableEntities := make(map[common.Entity]bool)
for _, filter := range filters {
joinTableEntities[filter.GetEntity()] = true
}
listExecutionsInput := repositoryInterfaces.ListResourceInput{
Limit: int(request.Limit),
Offset: offset,
InlineFilters: filters,
SortParameter: sortParameter,
Limit: int(request.Limit),
Offset: offset,
InlineFilters: filters,
SortParameter: sortParameter,
JoinTableEntities: joinTableEntities,
}
output, err := m.db.ExecutionRepo().List(ctx, listExecutionsInput)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,9 @@ func TestListExecutions(t *testing.T) {
assert.Equal(t, limit, input.Limit)
assert.Equal(t, "domain asc", input.SortParameter.GetGormOrderExpr())
assert.Equal(t, 2, input.Offset)
assert.EqualValues(t, map[common.Entity]bool{
common.Execution: true,
}, input.JoinTableEntities)
return interfaces.ExecutionCollectionOutput{
Executions: []models.Execution{
{
Expand Down
21 changes: 15 additions & 6 deletions pkg/repositories/gormimpl/execution_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/lyft/flyteadmin/pkg/common"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

"github.com/jinzhu/gorm"
Expand Down Expand Up @@ -109,12 +111,19 @@ func (r *ExecutionRepo) List(ctx context.Context, input interfaces.ListResourceI
}
var executions []models.Execution
tx := r.db.Limit(input.Limit).Offset(input.Offset)
// And add join condition (joining multiple tables is fine even we only filter on a subset of table attributes).
// (this query isn't called for deletes).
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.launch_plan_id = %s.id",
launchPlanTableName, executionTableName, launchPlanTableName))
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.workflow_id = %s.id",
workflowTableName, executionTableName, workflowTableName))
// And add join condition as required by user-specified filters (which can potentially include join table attrs).
if ok := input.JoinTableEntities[common.LaunchPlan]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.launch_plan_id = %s.id",
launchPlanTableName, executionTableName, launchPlanTableName))
}
if ok := input.JoinTableEntities[common.Workflow]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.workflow_id = %s.id",
workflowTableName, executionTableName, workflowTableName))
}
if ok := input.JoinTableEntities[common.Task]; ok {
tx = tx.Joins(fmt.Sprintf("INNER JOIN %s ON %s.task_id = %s.id",
taskTableName, executionTableName, taskTableName))
}

// Apply filters
tx, err := applyScopedFilters(tx, input.InlineFilters, input.MapFilters)
Expand Down
14 changes: 10 additions & 4 deletions pkg/repositories/gormimpl/execution_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,11 @@ func TestListExecutionsForWorkflow(t *testing.T) {
executions = append(executions, execution)

GlobalMock := mocket.Catcher.Reset()
query := `SELECT "executions".* FROM "executions" INNER JOIN launch_plans ON executions.launch_plan_id = ` +
`launch_plans.id INNER JOIN workflows ON executions.workflow_id = workflows.id WHERE "executions"."deleted_at"` +
` IS NULL AND ((executions.execution_project = project) AND (executions.execution_domain = domain) AND ` +
`(executions.execution_name = 1) AND (workflows.name = workflow_name)) LIMIT 20 OFFSET 0`
query := `SELECT "executions".* FROM "executions" INNER JOIN workflows ON executions.workflow_id = workflows.id ` +
`INNER JOIN tasks ON executions.task_id = tasks.id WHERE "executions"."deleted_at" IS NULL AND ` +
`((executions.execution_project = project) AND (executions.execution_domain = domain) AND ` +
`(executions.execution_name = 1) AND (workflows.name = workflow_name) AND (tasks.name = task_name)) ` +
`LIMIT 20 OFFSET 0`
GlobalMock.NewMock().WithQuery(query).WithReply(executions)

collection, err := executionRepo.List(context.Background(), interfaces.ListResourceInput{
Expand All @@ -387,8 +388,13 @@ func TestListExecutionsForWorkflow(t *testing.T) {
getEqualityFilter(common.Execution, "domain", domain),
getEqualityFilter(common.Execution, "name", "1"),
getEqualityFilter(common.Workflow, "name", "workflow_name"),
getEqualityFilter(common.Task, "name", "task_name"),
},
Limit: 20,
JoinTableEntities: map[common.Entity]bool{
common.Workflow: true,
common.Task: true,
},
})
assert.NoError(t, err)
assert.NotEmpty(t, collection)
Expand Down
3 changes: 3 additions & 0 deletions pkg/repositories/interfaces/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type ListResourceInput struct {
// pq driver value substitution.
MapFilters []common.MapFilter
SortParameter common.SortParameter
// A set of the entities (besides the primary table being queries) that should be joined with when performing
// the list query. This enables filtering on non-primary entity attributes.
JoinTableEntities map[common.Entity]bool
}

// Describes a set of resources for which to apply attribute updates.
Expand Down
4 changes: 2 additions & 2 deletions tests/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ package tests
import (
"context"
"fmt"
"github.com/lyft/flyteadmin/pkg/manager/impl/testutils"
"io/ioutil"
"net/http"
"testing"

"github.com/lyft/flyteadmin/pkg/manager/impl/testutils"

"github.com/golang/protobuf/proto"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
Expand All @@ -30,7 +31,6 @@ func TestCreateWorkflow(t *testing.T) {
_, err := client.CreateTask(ctx, &taskCreateReq)
assert.NoError(t, err)


identifier := core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: "admintests",
Expand Down

0 comments on commit f305d39

Please sign in to comment.