Skip to content

Commit

Permalink
update integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: cosmicBboy <[email protected]>
  • Loading branch information
cosmicBboy committed Apr 14, 2021
1 parent dbc242e commit 923b1d1
Showing 1 changed file with 37 additions and 26 deletions.
63 changes: 37 additions & 26 deletions tests/flytekit/integration/control_plane/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

import pytest

from flytekit.control_plane import launch_plan
from flytekit.control_plane import launch_plan, workflow
from flytekit.models.core.identifier import Identifier, ResourceType
from flytekit.models import literals

PROJECT = "flytesnacks"
VERSION = os.getpid()


@pytest.fixture(scope="session")
Expand All @@ -18,7 +20,7 @@ def flyte_workflows_source_dir():
@pytest.fixture(scope="session")
def flyte_workflows_register(docker_compose):
docker_compose.execute(
f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{os.getpid()} "
f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{VERSION} "
"backend make -C workflows register"
)

Expand All @@ -29,29 +31,38 @@ def test_client(flyteclient):


def test_launch_workflow(flyteclient, flyte_workflows_register):
execution = launch_plan.FlyteLaunchPlan.fetch(
PROJECT, "development", "workflows.basic.basic_workflow.my_wf", f"v{os.getpid()}"
).launch_with_literals(
PROJECT,
"development",
literals.LiteralMap(
{
"a": literals.Literal(literals.Scalar(literals.Primitive(integer=10))),
"b": literals.Literal(literals.Scalar(literals.Primitive(string_value="foobar"))),
}
),
for wf, data in [
("workflows.basic.hello_world.my_wf", literals.LiteralMap({})),
]:
lp = launch_plan.FlyteLaunchPlan.fetch(PROJECT, "development", wf, f"v{VERSION}")
execution = lp.launch_with_literals(PROJECT, "development", data)
execution.wait_for_completion()
print(execution.id)


def test_get_workflow(flyteclient, flyte_workflows_register):
wf_id = Identifier(
ResourceType.WORKFLOW, PROJECT, "development", "workflows.basic.basic_workflow.my_wf", f"v{VERSION}"
)
wf = flyteclient.get_workflow(wf_id)
print(wf)
assert wf_id == wf.id


for _ in range(20):
if not execution.is_complete:
time.sleep(0.5)
execution.sync()
if execution.node_executions is not None and len(execution.node_executions) > 1:
execution.node_executions["n0"]
# TODO: monitor and inspect node executions
continue
else:
break

assert execution.outputs.literals["o0"].scalar.primitive.integer == 12
assert execution.outputs.literals["o1"].scalar.primitive.string_value == "foobarworld"
def test_launch_workflow_with_args(flyteclient, flyte_workflows_register):
for wf, data in [
(
"workflows.basic.basic_workflow.my_wf",
literals.LiteralMap(
{
"a": literals.Literal(literals.Scalar(literals.Primitive(integer=10))),
"b": literals.Literal(literals.Scalar(literals.Primitive(string_value="foobar"))),
}
),
)
]:
lp = launch_plan.FlyteLaunchPlan.fetch(PROJECT, "development", wf, f"v{VERSION}")
execution = lp.launch_with_literals(PROJECT, "development", data)
execution.wait_for_completion()
assert execution.outputs.literals["o0"].scalar.primitive.integer == 12
assert execution.outputs.literals["o1"].scalar.primitive.string_value == "foobarworld"

0 comments on commit 923b1d1

Please sign in to comment.