Skip to content

Commit

Permalink
Use a wrapper to convert resolvers to async-generators
Browse files Browse the repository at this point in the history
  • Loading branch information
kinow committed Oct 1, 2019
1 parent 527907d commit d4e2503
Showing 1 changed file with 94 additions and 18 deletions.
112 changes: 94 additions & 18 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@
"""GraphQL API schema via Graphene implementation."""

import asyncio
from typing import Callable, AsyncGenerator, Any

from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.ws_data_mgr import (
ID_DELIM, FAMILIES, FAMILY_PROXIES,
JOBS, TASKS, TASK_PROXIES
)
from graphene import (
Boolean, Field, Float, ID, InputObjectType, Int,
List, Mutation, ObjectType, Schema, String, Union
)
from graphene.types.generic import GenericScalar
from graphene.utils.str_converters import to_snake_case

from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.ws_data_mgr import (
ID_DELIM, FAMILIES, FAMILY_PROXIES,
JOBS, TASKS, TASK_PROXIES
)


PROXY_NODES = 'proxy_nodes'


Expand Down Expand Up @@ -278,16 +277,6 @@ async def get_workflows(root, info, **args):
return await resolvers.get_workflows(args)


async def subscribe_workflows(root, info, **args):
while True:
args['workflows'] = [parse_workflow_id(w_id) for w_id in args['ids']]
args['exworkflows'] = [parse_workflow_id(w_id) for w_id in
args['exids']]
resolvers = info.context.get('resolvers')
yield await resolvers.get_workflows(args)
await asyncio.sleep(5.)


async def get_nodes_all(root, info, **args):
"""Resolver for returning job, task, family nodes"""
field_name = to_snake_case(info.field_name)
Expand Down Expand Up @@ -1196,15 +1185,102 @@ class Mutations(ObjectType):

# ** Subscription Related ** #

def to_subscription(func: Callable, sleep_seconds: float = 5.) -> Callable:
"""Wraps a function in a while-true-sleep, transforming
the function into an async-generator, used by the
websockets/subscriptions.
Args:
func (Callable): a callable.
sleep_seconds (float): asyncio sleep interval in seconds.
Returns:
Callable: a callable async-generator wrapping the original callable.
"""
async def gen(*args: Any, **kwargs: Any) -> AsyncGenerator[Any, None]:
"""
Args:
*args: Variable length argument list, varies as per schema.
**kwargs: Arbitrary keyword arguments, varies as per schema.
Returns:
AsyncGenerator[Any, None]: an async generator that will yield values from
resolvers.
"""
while True:
yield await func(*args, **kwargs)
await asyncio.sleep(sleep_seconds)
return gen


class Subscriptions(ObjectType):
"""Defines the subscriptions available in the schema."""
class Meta:
description = """Multi-Workflow root level subscriptions."""
workflows = List(
Workflow,
description=Workflow._meta.description,
ids=List(ID, default_value=[]),
exids=List(ID, default_value=[]),
resolver=subscribe_workflows)
resolver=to_subscription(get_workflows))
job = Field(
Job,
description=Job._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
jobs = List(
Job,
description=Job._meta.description,
args=all_jobs_args,
resolver=to_subscription(get_nodes_all))
task = Field(
Task,
description=Task._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
tasks = List(
Task,
description=Task._meta.description,
args=all_def_args,
resolver=to_subscription(get_nodes_all))
task_proxy = Field(
TaskProxy,
description=TaskProxy._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
task_proxies = List(
TaskProxy,
description=TaskProxy._meta.description,
args=all_proxy_args,
resolver=to_subscription(get_nodes_all))
family = Field(
Family,
description=Family._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
families = List(
Family,
description=Family._meta.description,
args=all_def_args,
resolver=to_subscription(get_nodes_all))
family_proxy = Field(
FamilyProxy,
description=FamilyProxy._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
family_proxies = List(
FamilyProxy,
description=FamilyProxy._meta.description,
args=all_proxy_args,
resolver=to_subscription(get_nodes_all))
edges = List(
Edge,
description=Edge._meta.description,
args=all_edge_args,
resolver=to_subscription(get_edges_all))
nodes_edges = Field(
NodesEdges,
description=NodesEdges._meta.description,
args=nodes_edges_args_all,
resolver=to_subscription(get_nodes_edges))


schema = Schema(query=Queries, subscription=Subscriptions, mutation=Mutations)

0 comments on commit d4e2503

Please sign in to comment.