Skip to content

Commit

Permalink
feat(Pipelines): Try to convert raw outputs to database table or bucket
Browse files Browse the repository at this point in the history
object when possible
  • Loading branch information
qgerome committed Aug 11, 2023
1 parent ce4fc62 commit 8812fdc
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 4 deletions.
5 changes: 4 additions & 1 deletion hexa/pipelines/graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ enum PipelineRunTrigger {
manual
}

type PipelineRunOutput {
type GenericOutput {
name: String
type: String!
uri: String!
}


union PipelineRunOutput = BucketObject | GenericOutput | DatabaseTable

enum PipelineRunStatus {
success
running
Expand Down
46 changes: 44 additions & 2 deletions hexa/pipelines/schema.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import base64
import pathlib

from ariadne import EnumType, MutationType, ObjectType, QueryType, load_schema_from_path
from ariadne import (
EnumType,
MutationType,
ObjectType,
QueryType,
UnionType,
load_schema_from_path,
)
from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.db import IntegrityError
from django.http import HttpRequest

from hexa.core.graphql import result_page
from hexa.databases.utils import get_table_definition
from hexa.files.api import get_bucket_object
from hexa.workspaces.models import Workspace, WorkspaceMembershipRole
from hexa.workspaces.schema.types import workspace_permissions

Expand Down Expand Up @@ -47,6 +56,18 @@ def resolve_workspace_permissions_create_pipeline(obj: Workspace, info, **kwargs
},
)
pipeline_object = ObjectType("Pipeline")
generic_output_object = ObjectType("GenericOutput")

pipeline_run_output_union = UnionType("PipelineRunOutput")


@pipeline_run_output_union.type_resolver
def resolve_run_output_type(obj, *_):
if obj["type"] == "file":
return "BucketObject"
elif obj["type"] == "table":
return "DatabaseTable"
return "GenericOutput"


@pipeline_parameter.field("name")
Expand Down Expand Up @@ -129,7 +150,6 @@ def resolve_pipeline_versions(pipeline: Pipeline, info, **kwargs):

@pipeline_object.field("runs")
def resolve_pipeline_runs(pipeline: Pipeline, info, **kwargs):
request: HttpRequest = info.context["request"]
qs = PipelineRun.objects.filter(pipeline=pipeline)

order_by = kwargs.get("orderBy", None)
Expand Down Expand Up @@ -254,6 +274,26 @@ def resolve_pipeline_run(_, info, **kwargs):
return None


@pipeline_run_object.field("outputs")
def resolve_pipeline_run_outputs(run: PipelineRun, info, **kwargs):
result = []
workspace = run.pipeline.workspace
for output in run.outputs:
if output["type"] == "file":
result.append(
get_bucket_object(
workspace.bucket_name,
output["uri"][len(f"gs://{workspace.bucket_name}/") :],
)
)
elif output["type"] == "db":
result.append(get_table_definition(workspace, output["name"]))
else:
result.append(output)

return result


pipelines_mutations = MutationType()


Expand Down Expand Up @@ -562,4 +602,6 @@ def resolve_add_pipeline_output(_, info, **kwargs):
pipeline_run_order_by_enum,
pipeline_version_object,
pipeline_permissions,
pipeline_run_output_union,
generic_output_object,
]
43 changes: 42 additions & 1 deletion hexa/pipelines/tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import base64
import uuid
from unittest.mock import patch
from unittest.mock import MagicMock, patch

from django import test
from django.conf import settings
Expand Down Expand Up @@ -435,6 +435,47 @@ def test_pipeline_by_code(self):
r["data"]["pipelineByCode"],
)

def test_pipeline_run_outputs(self):
self.test_create_pipeline_version()
self.client.force_login(self.USER_ROOT)
pipeline = Pipeline.objects.get(code="new_pipeline")
run = pipeline.run(
user=self.USER_ROOT,
pipeline_version=pipeline.last_version,
trigger_mode=PipelineRunTrigger.MANUAL,
config={},
)
run.state = PipelineRunState.SUCCESS
run.add_output(
f"gs://{pipeline.workspace.bucket_name}/my_file", "file", "my_file"
)
run.add_output("uri2", "db", "my_table")
run.add_output("uri3", "link", "my_link")

with patch(
"hexa.pipelines.schema.get_bucket_object",
MagicMock(),
) as bucket_mock, patch(
"hexa.pipelines.schema.get_table_definition",
MagicMock(),
) as table_mock:
r = self.run_query(
"""
query pipelineRunOutputs($id: UUID!) {
pipelineRun(id: $id) {
outputs {
__typename
}
}
}
""",
{"id": str(run.id)},
)
bucket_mock.assert_called_once_with(
pipeline.workspace.bucket_name, "my_file"
)
table_mock.assert_called_once_with(pipeline.workspace, "my_table")

def test_delete_pipeline_version_pipeline_not_found(self):
self.test_create_pipeline()
self.client.force_login(self.USER_ROOT)
Expand Down

0 comments on commit 8812fdc

Please sign in to comment.