diff --git a/supriya/contexts/core.py b/supriya/contexts/core.py index 3b94b0b43..c08e73cb8 100644 --- a/supriya/contexts/core.py +++ b/supriya/contexts/core.py @@ -24,47 +24,6 @@ from ..allocators import BlockAllocator, NodeIdAllocator from ..assets.synthdefs.default import default -from ..commands import ( - BufferAllocateReadChannelRequest, - BufferAllocateReadRequest, - BufferAllocateRequest, - BufferCloseRequest, - BufferCopyRequest, - BufferFillRequest, - BufferFreeRequest, - BufferInfo, - BufferNormalizeRequest, - BufferReadChannelRequest, - BufferReadRequest, - BufferWriteRequest, - BufferZeroRequest, - ControlBusFillRequest, - ControlBusSetRequest, - GroupDeepFreeRequest, - GroupFreeAllRequest, - GroupHeadRequest, - GroupNewRequest, - GroupTailRequest, - NodeAfterRequest, - NodeBeforeRequest, - NodeInfo, - NodeMapToAudioBusRequest, - NodeMapToControlBusRequest, - NodeOrderRequest, - NodeReleaseRequest, - NodeRunRequest, - NodeSetRequest, - ParallelGroupNewRequest, - Request, - RequestBundle, - Requestable, - SynthDefFreeAllRequest, - SynthDefFreeRequest, - SynthDefLoadDirectoryRequest, - SynthDefLoadRequest, - SynthDefReceiveRequest, - SynthNewRequest, -) from ..enums import AddAction, CalculationRate, ParameterRate from ..scsynth import Options from ..synthdefs import SynthDef @@ -73,7 +32,47 @@ CalculationRateLike, HeaderFormatLike, SampleFormatLike, + SupportsOsc, ) +from .requests import ( + AllocateBuffer, + AllocateReadBuffer, + AllocateReadBufferChannel, + CloseBuffer, + CopyBuffer, + FillBuffer, + FillControlBusRange, + FreeAllSynthDefs, + FreeBuffer, + FreeGroupChildren, + FreeGroupDeep, + FreeSynthDef, + LoadSynthDefDirectory, + LoadSynthDefs, + MapAudioBusToNode, + MapControlBusToNode, + MoveNodeAfter, + MoveNodeBefore, + MoveNodeToGroupHead, + MoveNodeToGroupTail, + NewGroup, + NewParallelGroup, + NewSynth, + NormalizeBuffer, + OrderNodes, + ReadBuffer, + ReadBufferChannel, + ReceiveSynthDefs, + ReleaseNode, + Request, + RequestBundle, + RunNode, + SetControlBus, + SetNodeControl, + WriteBuffer, + ZeroBuffer, +) +from .responses import BufferInfo, NodeInfo if TYPE_CHECKING: from .realtime import RealtimeContext @@ -139,13 +138,13 @@ def __exit__(self, *args) -> None: self.context._pop_completion() def __call__(self, request: Request) -> Request: - if not hasattr(request, "callback"): + if not hasattr(request, "on_completion"): raise ValueError(request) requests = self.context._apply_completions(self.requests) if len(requests) > 1: - request = new(request, callback=RequestBundle(contents=requests)) + request = new(request, on_completion=RequestBundle(contents=requests)) elif len(requests) == 1: - request = new(request, callback=requests[0]) + request = new(request, on_completion=requests[0]) return request @@ -427,6 +426,7 @@ def __init__(self, options: Optional[Options], **kwargs) -> None: self._client_id = 0 self._control_bus_allocator = BlockAllocator() self._latency = 0.0 + self._lock = threading.RLock() self._node_id_allocator = NodeIdAllocator() self._options = new(options or Options(), **kwargs) self._sync_id = self._sync_id_minimum = 0 @@ -529,6 +529,14 @@ def _get_moment(self) -> Optional[Moment]: return None return moments[-1] + def _get_next_sync_id(self) -> int: + with self._lock: + sync_id = self._sync_id + self._sync_id += 1 + if self._sync_id > self._sync_id_maximum: + self._sync_id = self._sync_id_minimum + return sync_id + def _get_request_context(self) -> Optional[RequestContext]: moments = self._thread_local.__dict__.get("moments", []) completions = self._thread_local.__dict__.get("completions", []) @@ -607,24 +615,26 @@ def add_buffer( raise ValueError id_ = self._allocate_id(Buffer) if file_path and channel_indices: - request: Request = BufferAllocateReadChannelRequest( + request: Request = AllocateReadBufferChannel( buffer_id=id_, channel_indices=channel_indices, - file_path=file_path, - frame_count=frame_count, - starting_frame=starting_frame, + path=file_path, + frame_count=frame_count or 0, + starting_frame=starting_frame or 0, ) elif file_path: - request = BufferAllocateReadRequest( + request = AllocateReadBuffer( buffer_id=id_, - file_path=file_path, - frame_count=frame_count, - starting_frame=starting_frame, + path=file_path, + frame_count=frame_count or 0, + starting_frame=starting_frame or 0, ) - else: - request = BufferAllocateRequest( - buffer_id=id_, channel_count=channel_count, frame_count=frame_count + elif frame_count: + request = AllocateBuffer( + buffer_id=id_, channel_count=channel_count or 1, frame_count=frame_count ) + else: + raise ValueError completion = self._add_request_with_completion(request, on_completion) return Buffer(context=self, id_=id_, completion=completion) @@ -646,7 +656,7 @@ def add_buffer_group( for i in range(count): buffers.append(Buffer(context=self, id_=id_ + i)) requests.append( - BufferAllocateRequest( + AllocateBuffer( buffer_id=id_ + i, channel_count=channel_count, frame_count=frame_count, @@ -697,15 +707,11 @@ def add_group( raise ValueError(add_action_) target_node_id = self._resolve_node(target_node) id_ = self._allocate_id(Node, permanent=permanent) - kwargs = dict( - add_action=add_action_, node_id=id_, target_node_id=target_node_id - ) + items = [(id_, add_action_, target_node_id)] if parallel: - request: Request = ParallelGroupNewRequest( - items=[GroupNewRequest.Item(**kwargs)] - ) + request: Request = NewParallelGroup(items=items) else: - request = GroupNewRequest(items=[GroupNewRequest.Item(**kwargs)]) + request = NewGroup(items=items) self._add_requests(request) return Group(context=self, id_=id_, parallel=parallel) @@ -725,7 +731,7 @@ def add_synth( if add_action_ not in target_node.valid_add_actions: raise ValueError(add_action_) target_node_id = self._resolve_node(target_node) - synthdef_kwargs: Dict[str, Union[float, str]] = {} + synthdef_kwargs: Dict[Union[int, str], Union[SupportsFloat, str]] = {} for _, parameter in synthdef.indexed_parameters: if parameter.name not in settings: continue @@ -744,12 +750,12 @@ def add_synth( synthdef_kwargs[parameter.name] = float(value) id_ = self._allocate_id(Node, permanent=permanent) self._add_requests( - SynthNewRequest( + NewSynth( add_action=add_action_, - node_id=id_, + synth_id=id_, synthdef=synthdef, target_node_id=target_node_id, - **synthdef_kwargs, + controls=synthdef_kwargs, ) ) return Synth(context=self, id_=id_, synthdef=synthdef) @@ -762,7 +768,7 @@ def add_synthdefs( self._validate_can_request() if not synthdefs: raise ValueError - request = SynthDefReceiveRequest(synthdefs=synthdefs) + request = ReceiveSynthDefs(synthdefs=synthdefs) return self._add_request_with_completion(request, on_completion) def at(self, seconds=None) -> Moment: @@ -775,7 +781,7 @@ def close_buffer( on_completion: Optional[Callable[["Context"], None]] = None, ) -> Completion: self._validate_can_request() - request = BufferCloseRequest(buffer_id=buffer.id_) + request = CloseBuffer(buffer_id=buffer.id_) return self._add_request_with_completion(request, on_completion) def copy_buffer( @@ -788,7 +794,7 @@ def copy_buffer( frame_count: int, ): self._validate_can_request() - request = BufferCopyRequest( + request = CopyBuffer( frame_count=frame_count, source_buffer_id=source_buffer, source_starting_frame=source_starting_frame, @@ -801,9 +807,8 @@ def fill_buffer( self, buffer: Buffer, starting_frame: int, frame_count: int, value: float ) -> None: self._validate_can_request() - request = BufferFillRequest( - buffer_id=buffer, - index_count_value_triples=[(starting_frame, frame_count, value)], + request = FillBuffer( + buffer_id=buffer, items=[(starting_frame, frame_count, value)] ) self._add_requests(request) @@ -811,9 +816,7 @@ def fill_buses(self, bus: Bus, count: int, value: float) -> None: self._validate_can_request() if bus.calculation_rate != CalculationRate.CONTROL: raise InvalidCalculationRate - request = ControlBusFillRequest( - index_count_value_triples=[(bus.id_, count, value)] - ) + request = FillControlBusRange(items=[(bus.id_, count, value)]) self._add_requests(request) def free_buffer( @@ -822,7 +825,7 @@ def free_buffer( on_completion: Optional[Callable[["Context"], None]] = None, ) -> Completion: self._validate_can_request() - request = BufferFreeRequest(buffer_id=buffer) + request = FreeBuffer(buffer_id=buffer) return self._add_request_with_completion(request, on_completion) def free_bus(self, bus: Bus) -> None: @@ -832,14 +835,14 @@ def free_bus(self, bus: Bus) -> None: def free_group_children(self, group: Group, synths_only=False) -> None: self._validate_can_request() if synths_only: - request: Request = GroupDeepFreeRequest(group.id_) + request: Request = FreeGroupDeep(node_ids=[group.id_]) else: - request = GroupFreeAllRequest(group.id_) + request = FreeGroupChildren(node_ids=[group.id_]) self._add_requests(request) def free_node(self, node: Node) -> None: self._validate_can_request() - request = NodeReleaseRequest( + request = ReleaseNode( node.id_, has_gate=isinstance(node, Synth) and "gate" in node.synthdef.parameters, ) @@ -849,12 +852,12 @@ def free_synthdefs(self, *synthdefs: SynthDef) -> None: self._validate_can_request() if not synthdefs: raise ValueError - request = SynthDefFreeRequest(*synthdefs) + request = FreeSynthDef(synthdefs=synthdefs) self._add_requests(request) def free_all_synthdefs(self) -> None: self._validate_can_request() - request = SynthDefFreeAllRequest() + request = FreeAllSynthDefs() self._add_requests(request) def load_synthdefs( @@ -863,7 +866,7 @@ def load_synthdefs( on_completion: Optional[Callable[["Context"], None]] = None, ) -> Completion: self._validate_can_request() - request = SynthDefLoadRequest(path=path) + request = LoadSynthDefs(path=path) return self._add_request_with_completion(request, on_completion) def load_synthdefs_directory( @@ -872,7 +875,7 @@ def load_synthdefs_directory( on_completion: Optional[Callable[["Context"], None]] = None, ) -> Completion: self._validate_can_request() - request = SynthDefLoadDirectoryRequest(path=path) + request = LoadSynthDefDirectory(path=path) return self._add_request_with_completion(request, on_completion) def map_node(self, node: Node, **settings: Union[Bus, None]) -> None: @@ -888,9 +891,13 @@ def map_node(self, node: Node, **settings: Union[Bus, None]) -> None: control[key] = -1 requests: List[Request] = [] if control: - requests.append(NodeMapToControlBusRequest(node_id=node, **control)) + requests.append( + MapControlBusToNode(node_id=node, items=sorted(control.items())) + ) if audio: - requests.append(NodeMapToAudioBusRequest(node_id=node, **audio)) + requests.append( + MapAudioBusToNode(node_id=node, items=sorted(audio.items())) + ) self._add_requests(*requests) def move_node( @@ -900,32 +907,34 @@ def move_node( add_action_ = AddAction.from_expr(add_action) items = [(node, target_node)] if add_action_ is AddAction.ADD_BEFORE: - request: Request = NodeBeforeRequest(items) + request: Request = MoveNodeBefore(items) elif add_action_ is AddAction.ADD_AFTER: - request = NodeAfterRequest(items) + request = MoveNodeAfter(items) elif add_action_ is AddAction.ADD_TO_TAIL: - request = GroupTailRequest(items) + request = MoveNodeToGroupTail(items) elif add_action_ is AddAction.ADD_TO_HEAD: - request = GroupHeadRequest(items) + request = MoveNodeToGroupHead(items) else: raise ValueError self._add_requests(request) def normalize_buffer(self, buffer: Buffer, new_maximum: float = 1.0) -> None: self._validate_can_request() - request = BufferNormalizeRequest(buffer_id=buffer.id_, new_maximum=new_maximum) + request = NormalizeBuffer(buffer_id=buffer.id_, new_maximum=new_maximum) self._add_requests(request) def order_nodes( self, target_node: Node, *nodes: Node, add_action: AddActionLike = None ) -> None: self._validate_can_request() - request = NodeOrderRequest(add_action, target_node, *nodes) + request = OrderNodes( + add_action=add_action, target_node_id=target_node, node_ids=nodes + ) self._add_requests(request) def pause_node(self, node: Node) -> None: self._validate_can_request() - request = NodeRunRequest([[node, False]]) + request = RunNode(items=[(node, False)]) self._add_requests(request) def read_buffer( @@ -941,44 +950,52 @@ def read_buffer( on_completion: Optional[Callable[["Context"], None]] = None, ) -> Completion: self._validate_can_request() - kwargs = dict( - buffer_id=buffer.id_, - file_path=file_path, - frame_count=frame_count, - leave_open=leave_open, - starting_frame_in_buffer=buffer_starting_frame, - starting_frame_in_file=starting_frame, - ) + frame_count_ = frame_count or 0 if channel_indices: - request: Request = BufferReadChannelRequest( - **kwargs, channel_indices=channel_indices + request: Request = ReadBufferChannel( + buffer_id=buffer.id_, + path=file_path, + frame_count=frame_count_ or -1, + leave_open=leave_open, + starting_frame_in_buffer=buffer_starting_frame or 0, + starting_frame_in_file=starting_frame or 0, + channel_indices=channel_indices, ) else: - request = BufferReadRequest(**kwargs) + request = ReadBuffer( + buffer_id=buffer.id_, + path=file_path, + frame_count=frame_count_ or -1, + leave_open=leave_open, + starting_frame_in_buffer=buffer_starting_frame or 0, + starting_frame_in_file=starting_frame or 0, + ) return self._add_request_with_completion(request, on_completion) @abc.abstractmethod - def send(self, requestable: Requestable): + def send(self, requestable: SupportsOsc): raise NotImplementedError def set_bus(self, bus: Bus, value: float) -> None: self._validate_can_request() if bus.calculation_rate != CalculationRate.CONTROL: raise InvalidCalculationRate - request = ControlBusSetRequest(index_value_pairs=[[bus.id_, value]]) + request = SetControlBus(items=[(bus.id_, value)]) self._add_requests(request) def set_node(self, node: Node, **settings: SupportsFloat) -> None: self._validate_can_request() - coerced_settings: Dict[str, float] = {} + coerced_settings: Dict[Union[int, str], float] = {} for key, value in settings.items(): coerced_settings[key] = float(value) - request = NodeSetRequest(node_id=node.id_, **coerced_settings) + request = SetNodeControl( + node_id=node.id_, items=sorted(coerced_settings.items()) + ) self._add_requests(request) def unpause_node(self, node: Node) -> None: self._validate_can_request() - request = NodeRunRequest([[node, True]]) + request = RunNode(items=[(node, True)]) self._add_requests(request) def write_buffer( @@ -994,14 +1011,14 @@ def write_buffer( on_completion: Optional[Callable[["Context"], None]] = None, ) -> Completion: self._validate_can_request() - request = BufferWriteRequest( + request = WriteBuffer( buffer_id=buffer.id_, - file_path=file_path, - frame_count=frame_count, + path=file_path, + frame_count=frame_count or -1, header_format=header_format, leave_open=leave_open, sample_format=sample_format, - starting_frame=starting_frame, + starting_frame=starting_frame or 0, ) return self._add_request_with_completion(request, on_completion) @@ -1011,7 +1028,7 @@ def zero_buffer( on_completion: Optional[Callable[["Context"], None]] = None, ) -> Completion: self._validate_can_request() - request = BufferZeroRequest(buffer_id=buffer) + request = ZeroBuffer(buffer_id=buffer) return self._add_request_with_completion(request, on_completion) ### PUBLIC PROPERTIES ### diff --git a/supriya/contexts/nonrealtime.py b/supriya/contexts/nonrealtime.py index f9e900a70..a9a964da1 100644 --- a/supriya/contexts/nonrealtime.py +++ b/supriya/contexts/nonrealtime.py @@ -1,19 +1,19 @@ from typing import Dict, Iterator, List, Optional, SupportsInt, Type, Union -from ..commands import Request, RequestBundle, Requestable from ..enums import CalculationRate from ..osc import OscBundle from ..scsynth import Options +from ..typing import SupportsOsc from .core import Context, ContextError, ContextObject, Node +from .requests import RequestBundle, Requestable class NonrealtimeContext(Context): - ### INITIALIZER ### def __init__(self, options: Optional[Options] = None, **kwargs): super().__init__(options=options, **kwargs) - self._requests: Dict[float, List[Request]] = {} + self._requests: Dict[float, List[Requestable]] = {} ### PRIVATE METHODS ### @@ -50,7 +50,7 @@ def iterate_request_bundles(self) -> Iterator[RequestBundle]: continue yield RequestBundle(timestamp=timestamp, contents=requests) - def send(self, requestable: Requestable) -> None: + def send(self, requestable: SupportsOsc) -> None: if not isinstance(requestable, RequestBundle): raise ContextError elif requestable.timestamp is None: diff --git a/supriya/contexts/realtime.py b/supriya/contexts/realtime.py index 67ced5fd1..523bafb12 100644 --- a/supriya/contexts/realtime.py +++ b/supriya/contexts/realtime.py @@ -3,7 +3,6 @@ import dataclasses import enum import logging -import threading from typing import ( TYPE_CHECKING, Awaitable, @@ -20,19 +19,6 @@ from uqbar.objects import new from ..assets.synthdefs import system_synthdefs -from ..commands import ( - BufferInfo, - BufferQueryRequest, - DoneResponse, - FailResponse, - GroupQueryTreeRequest, - NodeInfo, - NodeQueryRequest, - NotifyRequest, - QuitRequest, - Requestable, - SyncRequest, -) from ..enums import CalculationRate from ..exceptions import ( OwnedServerShutdown, @@ -45,16 +31,41 @@ from ..osc import ( AsyncOscProtocol, HealthCheck, - OscBundle, OscMessage, OscProtocol, OscProtocolOffline, ThreadedOscProtocol, ) -from ..querytree import QueryTreeGroup +from ..querytree import QueryTreeGroup, QueryTreeSynth from ..scsynth import AsyncProcessProtocol, Options, SyncProcessProtocol from ..synthdefs import SynthDef -from .core import Buffer, Bus, Context, ContextObject, Group, Node +from ..typing import SupportsOsc +from .core import ( + Buffer, + Bus, + Context, + ContextObject, + Group, + InvalidCalculationRate, + Node, +) +from .requests import ( + GetControlBus, + QueryBuffer, + QueryNode, + QueryTree, + Quit, + Sync, + ToggleNotifications, +) +from .responses import ( + BufferInfo, + DoneInfo, + FailInfo, + GetControlBusInfo, + NodeInfo, + QueryTreeInfo, +) if TYPE_CHECKING: from ..realtime.shm import ServerSHM @@ -80,7 +91,6 @@ class BootStatus(enum.IntEnum): class RealtimeContext(Context): - ### CLASS VARIABLES ### _contexts: Set["RealtimeContext"] = set() @@ -94,7 +104,6 @@ def __init__( self._is_owner = False self._boot_status = BootStatus.OFFLINE self._buffers: Set[int] = set() - self._lock = threading.RLock() self._maximum_logins = 1 self._node_active: Dict[int, bool] = {} self._node_children: Dict[int, List[int]] = {} @@ -137,14 +146,6 @@ def _free_id( ) -> None: self._get_allocator(type_, calculation_rate).free(id_) - def _get_next_sync_id(self) -> int: - with self._lock: - sync_id = self._sync_id - self._sync_id += 1 - if self._sync_id > self._sync_id_maximum: - self._sync_id = self._sync_id_minimum - return sync_id - def _handle_osc_callbacks(self, message: OscMessage) -> None: def _handle_done(message: OscMessage) -> None: if message.contents[0] in ( @@ -296,17 +297,22 @@ def query_buffer(self, buffer: Buffer) -> Union[Awaitable[BufferInfo], BufferInf def query_node(self, node: Node) -> Union[Awaitable[NodeInfo], NodeInfo]: raise NotImplementedError - def send(self, message: Union[OscBundle, OscMessage, Requestable]) -> None: + @abc.abstractmethod + def query_tree( + self, + ) -> Union[ + Awaitable[Union[QueryTreeGroup, QueryTreeSynth]], + Union[QueryTreeGroup, QueryTreeSynth], + ]: + raise NotImplementedError + + def send(self, message: SupportsOsc) -> None: if self._boot_status not in (BootStatus.BOOTING, BootStatus.ONLINE): raise ServerOffline self._osc_protocol.send( - message.to_osc() if isinstance(message, Requestable) else message + message.to_osc() if hasattr(message, "to_osc") else message ) - @abc.abstractmethod - def query_tree(self) -> Union[Awaitable[QueryTreeGroup], QueryTreeGroup]: - raise NotImplementedError - ### PUBLIC PROPERTIES ### @property @@ -327,7 +333,6 @@ def osc_protocol(self) -> OscProtocol: class Server(RealtimeContext): - ### INITIALIZER ### def __init__(self, options: Optional[Options] = None, **kwargs): @@ -337,7 +342,7 @@ def __init__(self, options: Optional[Options] = None, **kwargs): ### PRIVATE METHODS ### def _connect(self) -> None: - logger.info("connecting") + logger.info("Connecting") cast(ThreadedOscProtocol, self._osc_protocol).connect( ip_address=self._options.ip_address, port=self._options.port, @@ -353,10 +358,10 @@ def _connect(self) -> None: self._setup_system() self.sync() self._boot_status = BootStatus.ONLINE - logger.info("connected") + logger.info("Connected") def _disconnect(self) -> None: - logger.info("disconnecting") + logger.info("Disconnecting") self._boot_status = BootStatus.QUITTING self._teardown_shm() cast(ThreadedOscProtocol, self._osc_protocol).disconnect() @@ -366,22 +371,22 @@ def _disconnect(self) -> None: self._contexts.remove(self) self._is_owner = False self._boot_status = BootStatus.OFFLINE - logger.info("disconnected") + logger.info("Disconnected") def _setup_notifications(self) -> None: - response: Union[DoneResponse, FailResponse] = NotifyRequest(True).communicate( - server=self - ) - if isinstance(response, FailResponse): + logger.info("Setting up notifications") + response = ToggleNotifications(True).communicate(server=self) + if response is None or not isinstance(response, (DoneInfo, FailInfo)): + raise RuntimeError + if isinstance(response, FailInfo): self._shutdown() raise TooManyClients - if len(response.action) == 2: # supernova doesn't provide a max logins value - self._client_id, self._maximum_logins = ( - response.action[1], - self._options.maximum_logins, - ) + if len(response.other) == 1: # supernova doesn't provide a max logins value + self._client_id = int(response.other[0]) + self._maximum_logins = self._options.maximum_logins else: - self._client_id, self._maximum_logins = response.action[1:3] + self._client_id = int(response.other[0]) + self._maximum_logins = int(response.other[1]) def _shutdown(self): if self.is_owner: @@ -396,6 +401,7 @@ def boot(self, *, options: Optional[Options] = None, **kwargs) -> "Server": raise ServerOnline self._boot_status = BootStatus.BOOTING self._options = new(options or self._options, **kwargs) + logger.debug(f"Options: {self._options}") try: self._process_protocol.boot(self._options) except ServerCannotBoot: @@ -423,21 +429,30 @@ def disconnect(self) -> "Server": self._disconnect() return self - async def get_bus(self, bus: Bus) -> float: - raise NotImplementedError + def get_bus(self, bus: Bus) -> float: + if bus.calculation_rate != CalculationRate.CONTROL: + raise InvalidCalculationRate + return cast( + GetControlBusInfo, GetControlBus(bus_ids=[bus.id_]).communicate(server=self) + ).items[0][-1] def query_buffer(self, buffer: Buffer) -> BufferInfo: - request = BufferQueryRequest(buffer_ids=[buffer.id_]) - response = request.communicate(server=self) - return response + return cast( + BufferInfo, QueryBuffer(buffer_ids=[buffer.id_]).communicate(server=self) + ) def query_node(self, node: Node) -> NodeInfo: - return NodeQueryRequest(node_id=node).communicate(server=self) + return cast(NodeInfo, QueryNode(node_ids=[node.id_]).communicate(server=self)) - def query_tree(self) -> Union[Awaitable[QueryTreeGroup], QueryTreeGroup]: - request = GroupQueryTreeRequest(node_id=0, include_controls=True) - response = request.communicate(server=self) - return response.query_tree_group + def query_tree( + self, + ) -> Union[ + Awaitable[Union[QueryTreeGroup, QueryTreeSynth]], + Union[QueryTreeGroup, QueryTreeSynth], + ]: + return QueryTreeGroup.from_query_tree_info( + cast(QueryTreeInfo, QueryTree(items=[(0, True)]).communicate(server=self)) + ) def quit(self, force: bool = False) -> "Server": if self._boot_status != BootStatus.ONLINE: @@ -447,7 +462,7 @@ def quit(self, force: bool = False) -> "Server": "Cannot quit unowned server without force flag." ) try: - QuitRequest().communicate(server=self) + Quit().communicate(server=self) except OscProtocolOffline: pass self._teardown_shm() @@ -458,14 +473,13 @@ def quit(self, force: bool = False) -> "Server": def sync(self, sync_id: Optional[int] = None) -> "Server": if self._boot_status not in (BootStatus.BOOTING, BootStatus.ONLINE): raise ServerOffline - SyncRequest( + Sync( sync_id=sync_id if sync_id is not None else self._get_next_sync_id() ).communicate(server=self) return self class AsyncServer(RealtimeContext): - ### INITIALIZER ### def __init__(self, options: Optional[Options] = None, **kwargs): @@ -507,19 +521,18 @@ async def _disconnect(self) -> None: async def _setup_notifications(self) -> None: logger.info("Setting up notifications") - response: Union[DoneResponse, FailResponse] = await NotifyRequest( - True - ).communicate_async(server=self) - if isinstance(response, FailResponse): + response = await ToggleNotifications(True).communicate_async(server=self) + if response is None or not isinstance(response, (DoneInfo, FailInfo)): + raise RuntimeError + if isinstance(response, FailInfo): await self._shutdown() raise TooManyClients - if len(response.action) == 2: # supernova doesn't provide a max logins value - self._client_id, self._maximum_logins = ( - response.action[1], - self._options.maximum_logins, - ) + if len(response.other) == 1: # supernova doesn't provide a max logins value + self._client_id = int(response.other[0]) + self._maximum_logins = self._options.maximum_logins else: - self._client_id, self._maximum_logins = response.action[1:3] + self._client_id = int(response.other[0]) + self._maximum_logins = int(response.other[1]) async def _shutdown(self): if self.is_owner: @@ -536,6 +549,7 @@ async def boot( raise ServerOnline self._boot_status = BootStatus.BOOTING self._options = new(options or self._options, **kwargs) + logger.debug(f"Options: {self._options}") await self._process_protocol.boot(self._options) if not await self._process_protocol.boot_future: self._boot_status = BootStatus.OFFLINE @@ -564,20 +578,32 @@ async def disconnect(self) -> "AsyncServer": return self async def get_bus(self, bus: Bus) -> float: - raise NotImplementedError + if bus.calculation_rate != CalculationRate.CONTROL: + raise InvalidCalculationRate + return cast( + GetControlBusInfo, + await GetControlBus(bus_ids=[bus.id_]).communicate_async(server=self), + ).items[0][-1] async def query_buffer(self, buffer: Buffer) -> BufferInfo: - request = BufferQueryRequest(buffer_ids=[buffer.id_]) - response = await request.communicate_async(server=self) - return response + return cast( + BufferInfo, + await QueryBuffer(buffer_ids=[buffer.id_]).communicate_async(server=self), + ) async def query_node(self, node: Node) -> NodeInfo: - return await NodeQueryRequest(node_id=node).communicate_async(server=self) + return cast( + NodeInfo, + await QueryNode(node_ids=[node.id_]).communicate_async(server=self), + ) - async def query_tree(self) -> QueryTreeGroup: - request = GroupQueryTreeRequest(node_id=0, include_controls=True) - response = await request.communicate_async(server=self) - return response.query_tree_group + async def query_tree(self) -> Union[QueryTreeGroup, QueryTreeSynth]: + return QueryTreeGroup.from_query_tree_info( + cast( + QueryTreeInfo, + await QueryTree(items=[(0, True)]).communicate_async(server=self), + ) + ) async def quit(self, force: bool = False) -> "AsyncServer": if self._boot_status != BootStatus.ONLINE: @@ -587,7 +613,7 @@ async def quit(self, force: bool = False) -> "AsyncServer": "Cannot quit unowned server without force flag." ) try: - await QuitRequest().communicate_async(server=self, sync=True, timeout=1) + await Quit().communicate_async(server=self, sync=True, timeout=1) except (OscProtocolOffline, asyncio.TimeoutError): pass self._process_protocol.quit() @@ -597,7 +623,7 @@ async def quit(self, force: bool = False) -> "AsyncServer": async def sync(self, sync_id: Optional[int] = None) -> "AsyncServer": if self._boot_status not in (BootStatus.BOOTING, BootStatus.ONLINE): raise ServerOffline - await SyncRequest( + await Sync( sync_id=sync_id if sync_id is not None else self._get_next_sync_id() ).communicate_async(server=self) return self diff --git a/supriya/contexts/requests.py b/supriya/contexts/requests.py new file mode 100644 index 000000000..abb41b1b0 --- /dev/null +++ b/supriya/contexts/requests.py @@ -0,0 +1,1433 @@ +import asyncio +import dataclasses +from abc import ABC, abstractmethod +from concurrent.futures import Future +from os import PathLike +from typing import ( + TYPE_CHECKING, + Dict, + List, + Optional, + Sequence, + SupportsFloat, + SupportsInt, + Tuple, + Union, +) + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal # type: ignore + +from uqbar.objects import new + +from ..enums import AddAction, HeaderFormat, RequestName, SampleFormat +from ..osc import OscBundle, OscMessage +from ..synthdefs import SynthDef, SynthDefCompiler +from ..typing import AddActionLike, HeaderFormatLike, SampleFormatLike, SupportsOsc +from .responses import Response + +if TYPE_CHECKING: + from .contexts.core import Context + from .contexts.realtime import AsyncServer, Server + + +class Requestable(ABC): + ### PRIVATE METHODS ### + + @abstractmethod + def _get_response_patterns_and_requestable( + self, context: "Context" + ) -> Tuple[ + Optional[Sequence[Union[float, str]]], + Optional[Sequence[Union[float, str]]], + "Requestable", + ]: + raise NotImplementedError + + ### PUBLIC METHODS ### + + def communicate( + self, server: "Server", sync=True, timeout=1.0 + ) -> Optional[Response]: + if not sync: + server.send(self) + return None + ( + success_pattern, + failure_pattern, + requestable, + ) = self._get_response_patterns_and_requestable(server) + if not success_pattern: + server.send(self) + return None + future: Future[Response] = Future() + server._osc_protocol.register( + pattern=success_pattern, + failure_pattern=failure_pattern, + procedure=lambda message: future.set_result(Response.from_osc(message)), + once=True, + ) + server.send(requestable) + return future.result(timeout=timeout) + + async def communicate_async( + self, server: "AsyncServer", sync=True, timeout=1.0 + ) -> Optional[Response]: + if not sync: + server.send(self) + return None + ( + success_pattern, + failure_pattern, + requestable, + ) = self._get_response_patterns_and_requestable(server) + if not sync or not success_pattern: + server.send(self) + return None + future: asyncio.Future[Response] = asyncio.get_running_loop().create_future() + server._osc_protocol.register( + pattern=success_pattern, + failure_pattern=failure_pattern, + procedure=lambda message: future.set_result(Response.from_osc(message)), + once=True, + ) + server.send(requestable) + await asyncio.wait_for(future, timeout=timeout) + return future.result() + + @abstractmethod + def to_osc(self) -> Union[OscBundle, OscMessage]: + raise NotImplementedError + + +@dataclasses.dataclass +class Request(Requestable): + ### PRIVATE METHODS ### + + def _get_response_patterns( + self, + ) -> Tuple[ + Optional[Sequence[Union[float, str]]], Optional[Sequence[Union[float, str]]] + ]: + return None, None + + def _get_response_patterns_and_requestable( + self, context: "Context" + ) -> Tuple[ + Optional[Sequence[Union[float, str]]], + Optional[Sequence[Union[float, str]]], + "Requestable", + ]: + success_pattern, failure_pattern = self._get_response_patterns() + return success_pattern, failure_pattern, self + + ### PUBLIC METHODS ### + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + return requests + + +@dataclasses.dataclass +class RequestBundle(Requestable): + contents: Sequence[Requestable] + timestamp: Optional[float] = None + + ### PRIVATE METHODS ### + + def _get_response_patterns_and_requestable( + self, context: "Context" + ) -> Tuple[ + Optional[Sequence[Union[float, str]]], + Optional[Sequence[Union[float, str]]], + "Requestable", + ]: + sync_id = context._get_next_sync_id() + request_bundle: "RequestBundle" = new( + self, contents=list(self.contents) + [Sync(sync_id=sync_id)] + ) + response_pattern: List[Union[float, str]] = ["/synced", sync_id] + return response_pattern, None, request_bundle + + ### PUBLIC METHODS ### + + def to_osc(self) -> OscBundle: + return OscBundle( + contents=[x.to_osc() for x in self.contents], timestamp=self.timestamp + ) + + +@dataclasses.dataclass +class AllocateBuffer(Request): + """ + A ``/b_alloc`` request. + """ + + buffer_id: SupportsInt + frame_count: int + channel_count: int = 1 + on_completion: Optional[SupportsOsc] = None + + def _get_response_patterns(self): + return ["/done", "/b_alloc", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, int]] = [ + int(self.buffer_id), + int(self.frame_count), + int(self.channel_count), + ] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_ALLOCATE, *contents) + + +@dataclasses.dataclass +class AllocateReadBuffer(Request): + """ + A ``/b_allocRead`` request. + """ + + buffer_id: SupportsInt + path: PathLike + starting_frame: int = 0 + frame_count: int = 0 + on_completion: Optional[SupportsOsc] = None + + def _get_response_patterns(self): + return ["/done", "/b_allocRead", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, int, str]] = [ + int(self.buffer_id), + str(self.path), + int(self.starting_frame), + int(self.frame_count), + ] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_ALLOCATE_READ, *contents) + + +@dataclasses.dataclass +class AllocateReadBufferChannel(Request): + """ + A ``/b_allocReadChannel`` request. + """ + + buffer_id: SupportsInt + path: PathLike + channel_indices: Sequence[int] + starting_frame: int = 0 + frame_count: int = 0 + on_completion: Optional[SupportsOsc] = None + + def _get_response_patterns(self): + return ["/done", "/b_allocReadChannel", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, int, str]] = [ + int(self.buffer_id), + str(self.path), + int(self.starting_frame), + int(self.frame_count), + *(int(channel_index) for channel_index in self.channel_indices), + ] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_ALLOCATE_READ_CHANNEL, *contents) + + +@dataclasses.dataclass +class AutoReassignSynthID(Request): + """ + A ``/s_noid`` request. + """ + + synth_ids: Sequence[SupportsInt] + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.SYNTH_NOID, *(int(synth_id) for synth_id in self.synth_ids) + ) + + +@dataclasses.dataclass +class ClearSchedule(Request): + """ + A ``/clearSched`` request. + """ + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.CLEAR_SCHEDULE) + + +@dataclasses.dataclass +class CloseBuffer(Request): + """ + A ``/b_close`` request. + """ + + buffer_id: SupportsInt + on_completion: Optional[Requestable] = None + + def _get_response_patterns(self): + return ["/done", "/b_close", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, int]] = [int(self.buffer_id)] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_CLOSE, *contents) + + +@dataclasses.dataclass +class CopyBuffer(Request): + """ + A ``/b_gen`` ``copy`` request. + """ + + source_buffer_id: SupportsInt + target_buffer_id: SupportsInt + frame_count: int = -1 + source_starting_frame: int = 0 + target_starting_frame: int = 0 + + def _get_response_patterns(self): + return ["/done", "/b_gen", self.buffer_id], None + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.BUFFER_GENERATE, + int(self.target_buffer_id), + "copy", + int(self.target_starting_frame), + int(self.source_buffer_id), + int(self.source_starting_frame), + int(self.frame_count), + ) + + +@dataclasses.dataclass +class DoNothing(Request): + """ + A "nothing" request. + """ + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.NOTHING) + + +@dataclasses.dataclass +class DumpOsc(Request): + """ + A ``/dumpOSC`` request. + """ + + code: int + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.DUMP_OSC, self.code) + + +@dataclasses.dataclass +class DumpTree(Request): + """ + A ``/g_dumpTree`` request. + """ + + items: Sequence[Tuple[SupportsInt, bool]] + + def to_osc(self) -> OscMessage: + contents = [] + for node_id, flag in self.items: + contents.extend([int(node_id), bool(flag)]) + return OscMessage(RequestName.GROUP_DUMP_TREE, *contents) + + +@dataclasses.dataclass +class FillBuffer(Request): + """ + A ``/b_fill`` request. + """ + + buffer_id: SupportsInt + items: Sequence[Tuple[int, int, float]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items_by_buffer_id: Dict[int, List[Tuple[int, int, float]]] = {} + for request in requests: + if isinstance(request, cls): + items_by_buffer_id.setdefault(int(request.buffer_id), []).extend( + request.items + ) + return [ + cls(buffer_id=buffer_id, items=items) + for buffer_id, items in sorted(items_by_buffer_id.items()) + ] + + def to_osc(self) -> OscMessage: + contents: List[Union[float]] = [int(self.buffer_id)] + for index, count, value in self.items: + contents.extend([int(index), int(count), float(value)]) + return OscMessage(RequestName.BUFFER_FILL, *contents) + + +@dataclasses.dataclass +class FillControlBusRange(Request): + """ + A ``/c_fill`` request. + """ + + items: Sequence[Tuple[SupportsInt, int, float]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items: List[Tuple[SupportsInt, int, float]] = [] + for request in requests: + if not isinstance(request, cls): + continue + items.extend(request.items) + return [cls(items=items)] + + def to_osc(self) -> OscMessage: + contents: List[float] = [] + for index, count, value in self.items: + contents.extend([int(index), int(count), float(value)]) + return OscMessage(RequestName.CONTROL_BUS_FILL, *contents) + + +@dataclasses.dataclass +class FillNode(Request): + """ + A ``/n_fill`` request. + """ + + node_id: SupportsInt + items: Sequence[Tuple[Union[int, str], int, float]] + + def to_osc(self) -> OscMessage: + contents: List[Union[float, str]] = [int(self.node_id)] + for control, count, value in self.items: + contents.extend( + [ + control if isinstance(control, str) else int(control), + int(count), + float(value), + ] + ) + return OscMessage(RequestName.NODE_FILL, *contents) + + +@dataclasses.dataclass +class FreeAllSynthDefs(Request): + """ + A ``/d_freeAll`` request. + """ + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.SYNTHDEF_FREE_ALL) + + +@dataclasses.dataclass +class FreeBuffer(Request): + """ + A ``/b_free`` request. + """ + + buffer_id: SupportsInt + on_completion: Optional[Requestable] = None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, int]] = [int(self.buffer_id)] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_FREE, *contents) + + +@dataclasses.dataclass +class FreeGroupChildren(Request): + """ + A ``/g_freeAll`` request. + """ + + node_ids: Sequence[SupportsInt] + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.GROUP_FREE_ALL, *(int(node_id) for node_id in self.node_ids) + ) + + +@dataclasses.dataclass +class FreeGroupDeep(Request): + """ + A ``/g_deepFree`` request. + """ + + node_ids: Sequence[SupportsInt] + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.GROUP_DEEP_FREE, *(int(node_id) for node_id in self.node_ids) + ) + + +@dataclasses.dataclass +class FreeNode(Request): + """ + A ``/n_free`` request. + """ + + node_ids: Sequence[SupportsInt] + + def _get_response_patterns(self): + return ["/n_end", int(self.node_ids[-1])], None + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.NODE_FREE, *(int(node_id) for node_id in self.node_ids) + ) + + +@dataclasses.dataclass +class FreeSynthDef(Request): + """ + A ``/d_free`` request. + """ + + synthdefs: Sequence[Union[SynthDef, str]] + + def to_osc(self) -> OscMessage: + contents = [] + for x in self.synthdefs: + if isinstance(x, SynthDef): + contents.append(x.actual_name) + else: + contents.append(x) + return OscMessage(RequestName.SYNTHDEF_FREE, *contents) + + +@dataclasses.dataclass +class GenerateBuffer(Request): + """ + A ``/b_gen`` request, for use with ``sine1``, ``sine2``, ``sine3`` + and ``cheby`` commands. + """ + + buffer_id: SupportsInt + command_name: Literal["sine1", "sine2", "sine3", "cheby"] + amplitudes: Sequence[float] + frequencies: Optional[Sequence[float]] + phases: Optional[Sequence[float]] + as_wavetable: bool = False + should_clear_first: bool = False + should_normalize: bool = False + + def _get_response_patterns(self): + return ["/done", "/b_gen", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[str, float]] = [ + int(self.buffer_id), + self.command_name, + ( + self.should_normalize + | (self.as_wavetable * 2) + | (self.should_clear_first * 4) + ), + ] + sequences = [] + if self.frequencies: + sequences.append(self.frequencies) + sequences.append(self.amplitudes) + if self.phases: + sequences.append(self.phases) + for values in zip(*sequences): + for value in values: + contents.append(float(value)) + return OscMessage(RequestName.BUFFER_GENERATE, *contents) + + +@dataclasses.dataclass +class GetBuffer(Request): + """ + A ``/b_get`` request. + """ + + buffer_id: SupportsInt + indices: Sequence[int] + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.BUFFER_GET, + int(self.buffer_id), + *(int(index) for index in self.indices), + ) + + +@dataclasses.dataclass +class GetBufferRange(Request): + """ + A ``/b_getn`` request. + """ + + buffer_id: SupportsInt + items: Sequence[Tuple[int, int]] + + def to_osc(self) -> OscMessage: + contents: List[int] = [int(self.buffer_id)] + for index, count in self.items: + contents.extend([int(index), int(count)]) + return OscMessage(RequestName.BUFFER_GET, *contents) + + +@dataclasses.dataclass +class GetControlBus(Request): + """ + A ``/c_get`` request. + """ + + bus_ids: Sequence[SupportsInt] + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.CONTROL_BUS_GET, *(int(bus_id) for bus_id in self.bus_ids) + ) + + +@dataclasses.dataclass +class GetControlBusRange(Request): + """ + A ``/c_getn`` request. + """ + + items: Sequence[Tuple[SupportsInt, int]] + + def to_osc(self) -> OscMessage: + contents: List[int] = [] + for index, count in self.items: + contents.extend([int(index), int(count)]) + return OscMessage(RequestName.CONTROL_BUS_GET_CONTIGUOUS, *contents) + + +@dataclasses.dataclass +class GetStatus(Request): + """ + A ``/status`` request. + """ + + def _get_response_patterns(self): + return ["/status.reply"], None + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.STATUS) + + +@dataclasses.dataclass +class GetSynthControl(Request): + """ + A ``/s_get`` request. + """ + + synth_id: SupportsInt + controls: Sequence[Union[int, str]] + + def to_osc(self) -> OscMessage: + contents: List[Union[int, str]] = [int(self.synth_id)] + for control in self.controls: + contents.append(control if isinstance(control, str) else int(control)) + return OscMessage(RequestName.SYNTH_GET, *contents) + + +@dataclasses.dataclass +class GetSynthControlRange(Request): + """ + A ``/s_getn`` request. + """ + + synth_id: SupportsInt + items: Sequence[Tuple[Union[int, str], int]] + + def to_osc(self) -> OscMessage: + contents: List[Union[int, str]] = [int(self.synth_id)] + for control, count in self.items: + contents.extend( + [control if isinstance(control, str) else int(control), int(count)] + ) + return OscMessage(RequestName.SYNTH_GET_CONTIGUOUS, *contents) + + +@dataclasses.dataclass +class GetVersion(Request): + """ + A ``/version`` request. + """ + + def _get_response_patterns(self): + return ["/version.reply"], None + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.VERSION) + + +@dataclasses.dataclass +class LoadSynthDefs(Request): + """ + A ``/d_load`` request. + """ + + path: PathLike + on_completion: Optional[Requestable] = None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, str]] = [str(self.path)] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.SYNTHDEF_LOAD, *contents) + + +@dataclasses.dataclass +class LoadSynthDefDirectory(Request): + """ + A ``/d_loadDir`` request. + """ + + path: PathLike + on_completion: Optional[Requestable] = None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, str]] = [str(self.path)] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.SYNTHDEF_LOAD_DIR, *contents) + + +@dataclasses.dataclass +class MapAudioBusToNode(Request): + """ + A ``/n_mapa`` request. + """ + + node_id: SupportsInt + items: Sequence[Tuple[Union[int, str], SupportsInt]] + + def to_osc(self) -> OscMessage: + contents: List[Union[int, str]] = [int(self.node_id)] + for index_or_name, bus_index in self.items: + contents.extend( + [ + index_or_name + if isinstance(index_or_name, str) + else int(index_or_name), + int(bus_index), + ] + ) + return OscMessage(RequestName.NODE_MAP_TO_AUDIO_BUS, *contents) + + +@dataclasses.dataclass +class MapAudioBusRangeToNode(Request): + """ + A ``/n_mapan`` request. + """ + + node_id: SupportsInt + items: Sequence[Tuple[Union[int, str], SupportsInt, int]] + + def to_osc(self) -> OscMessage: + contents: List[Union[int, str]] = [int(self.node_id)] + for index_or_name, bus_index, count in self.items: + contents.extend( + [ + index_or_name + if isinstance(index_or_name, str) + else int(index_or_name), + int(bus_index), + int(count), + ] + ) + return OscMessage(RequestName.NODE_MAP_TO_AUDIO_BUS_CONTIGUOUS, *contents) + + +@dataclasses.dataclass +class MapControlBusToNode(Request): + """ + A ``/n_map`` request. + """ + + node_id: SupportsInt + items: Sequence[Tuple[Union[int, str], SupportsInt]] + + def to_osc(self) -> OscMessage: + contents: List[Union[int, str]] = [int(self.node_id)] + for index_or_name, bus_index in self.items: + contents.extend( + [ + index_or_name + if isinstance(index_or_name, str) + else int(index_or_name), + int(bus_index), + ] + ) + return OscMessage(RequestName.NODE_MAP_TO_CONTROL_BUS, *contents) + + +@dataclasses.dataclass +class MapControlBusRangeToNode(Request): + """ + A ``/n_mapn`` request. + """ + + node_id: SupportsInt + items: Sequence[Tuple[Union[int, str], SupportsInt, int]] + + def to_osc(self) -> OscMessage: + contents: List[Union[int, str]] = [int(self.node_id)] + for index_or_name, bus_index, count in self.items: + contents.extend( + [ + index_or_name + if isinstance(index_or_name, str) + else int(index_or_name), + int(bus_index), + int(count), + ] + ) + return OscMessage(RequestName.NODE_MAP_TO_CONTROL_BUS_CONTIGUOUS, *contents) + + +@dataclasses.dataclass +class MoveNodeAfter(Request): + """ + A ``/n_after`` request. + """ + + items: Sequence[Tuple[SupportsInt, SupportsInt]] + + def to_osc(self) -> OscMessage: + contents: List[int] = [] + for node_id, target_node_id in self.items: + contents.extend([int(node_id), int(target_node_id)]) + return OscMessage(RequestName.NODE_AFTER, *contents) + + +@dataclasses.dataclass +class MoveNodeBefore(Request): + """ + A ``/n_before`` request. + """ + + items: Sequence[Tuple[SupportsInt, SupportsInt]] + + def to_osc(self) -> OscMessage: + contents: List[int] = [] + for node_id, target_node_id in self.items: + contents.extend([int(node_id), int(target_node_id)]) + return OscMessage(RequestName.NODE_BEFORE, *contents) + + +@dataclasses.dataclass +class MoveNodeToGroupHead(Request): + """ + A ``/g_head`` request. + """ + + items: Sequence[Tuple[SupportsInt, SupportsInt]] + + def to_osc(self) -> OscMessage: + contents: List[int] = [] + for node_id, target_group_id in self.items: + contents.extend([int(target_group_id), int(node_id)]) + return OscMessage(RequestName.GROUP_HEAD, *contents) + + +@dataclasses.dataclass +class MoveNodeToGroupTail(Request): + """ + A ``/g_tail`` request. + """ + + items: Sequence[Tuple[SupportsInt, SupportsInt]] + + def to_osc(self) -> OscMessage: + contents: List[int] = [] + for node_id, target_group_id in self.items: + contents.extend([int(target_group_id), int(node_id)]) + return OscMessage(RequestName.GROUP_TAIL, *contents) + + +@dataclasses.dataclass +class NewGroup(Request): + """ + A ``/g_new`` request. + """ + + items: Sequence[Tuple[SupportsInt, AddActionLike, SupportsInt]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items: List[Tuple[SupportsInt, AddActionLike, SupportsInt]] = [] + for request in requests: + if not isinstance(request, cls): + continue + items.extend(request.items) + return [cls(items=items)] + + def to_osc(self) -> OscMessage: + contents = [] + for group_id, add_action, target_node_id in self.items: + contents.extend( + [int(group_id), AddAction.from_expr(add_action), int(target_node_id)] + ) + return OscMessage(RequestName.GROUP_NEW, *contents) + + +@dataclasses.dataclass +class NewParallelGroup(Request): + """ + A ``/p_new`` request. + """ + + items: Sequence[Tuple[SupportsInt, AddActionLike, SupportsInt]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items: List[Tuple[SupportsInt, AddActionLike, SupportsInt]] = [] + for request in requests: + if not isinstance(request, cls): + continue + items.extend(request.items) + return [cls(items=items)] + + def to_osc(self) -> OscMessage: + contents = [] + for group_id, add_action, target_node_id in self.items: + contents.extend( + [int(group_id), AddAction.from_expr(add_action), int(target_node_id)] + ) + return OscMessage(RequestName.PARALLEL_GROUP_NEW, *contents) + + +@dataclasses.dataclass +class NewSynth(Request): + """ + A ``/s_new`` request. + """ + + synthdef: Union[SynthDef, str] + synth_id: SupportsInt + add_action: AddActionLike + target_node_id: SupportsInt + controls: Dict[Union[int, str], Union[SupportsFloat, str]] = dataclasses.field( + default_factory=dict + ) + + def to_osc(self) -> OscMessage: + contents: List[Union[float, str]] = [ + self.synthdef.actual_name + if isinstance(self.synthdef, SynthDef) + else self.synthdef, + int(self.synth_id), + AddAction.from_expr(self.add_action), + int(self.target_node_id), + ] + for key, value in self.controls.items(): + contents.append(key if isinstance(key, str) else int(key)) + contents.append(value if isinstance(value, str) else float(value)) + return OscMessage(RequestName.SYNTH_NEW, *contents) + + +@dataclasses.dataclass +class NormalizeBuffer(Request): + """ + A ``/b_gen`` ``normalize`` (or ``wnormalize``) request. + """ + + buffer_id: SupportsInt + new_maximum: float = 1.0 + as_wavetable: bool = False + + def _get_response_patterns(self): + return ["/done", "/b_gen", self.buffer_id], None + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.BUFFER_GENERATE, + int(self.buffer_id), + "wnormalize" if self.as_wavetable else "normalize", + float(self.new_maximum), + ) + + +@dataclasses.dataclass +class OrderNodes(Request): + """ + A ``/n_order`` request. + """ + + add_action: AddActionLike + target_node_id: SupportsInt + node_ids: Sequence[SupportsInt] + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.NODE_ORDER, + AddAction.from_expr(self.add_action), + int(self.target_node_id), + *(int(node_id) for node_id in self.node_ids), + ) + + +@dataclasses.dataclass +class QueryBuffer(Request): + """ + A ``/b_query`` request. + """ + + buffer_ids: Sequence[int] + + def _get_response_patterns(self): + # TODO: We should be able to gather multiple responses + return ["/b_info", self.buffer_ids[-1]], None + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.BUFFER_QUERY, *(int(buffer_id) for buffer_id in self.buffer_ids) + ) + + +@dataclasses.dataclass +class QueryNode(Request): + """ + A ``/n_query`` request. + """ + + node_ids: Sequence[SupportsInt] + + def _get_response_patterns(self): + # TODO: We should be able to gather multiple responses + return ["/n_info", int(self.node_ids[-1])], None + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.NODE_QUERY, *(int(node_id) for node_id in self.node_ids) + ) + + +@dataclasses.dataclass +class QueryTree(Request): + """ + A ``/g_queryTree`` request. + """ + + items: Sequence[Tuple[SupportsInt, bool]] + + def _get_response_patterns(self): + # TODO: We should be able to gather multiple responses + return [ + "/g_queryTree.reply", + int(self.items[-1][1]), + int(self.items[-1][0]), + ], None + + def to_osc(self) -> OscMessage: + contents: List[int] = [] + for node_id, flag in self.items: + contents.extend([int(node_id), int(bool(flag))]) + return OscMessage(RequestName.GROUP_QUERY_TREE, *contents) + + +@dataclasses.dataclass +class Quit(Request): + """ + A ``/quit`` request. + """ + + def _get_response_patterns(self): + return ["/done", "/quit"], None + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.QUIT) + + +@dataclasses.dataclass +class ReadBuffer(Request): + """ + A ``/b_read`` request. + """ + + buffer_id: SupportsInt + path: PathLike + frame_count: int = -1 + leave_open: bool = False + starting_frame_in_buffer: int = 0 + starting_frame_in_file: int = 0 + on_completion: Optional[Requestable] = None + + def _get_response_patterns(self): + return ["/done", "/b_read", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, int, str]] = [ + int(self.buffer_id), + str(self.path), + int(self.starting_frame_in_file), + int(self.frame_count), + int(self.starting_frame_in_buffer), + int(bool(self.leave_open)), + ] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_READ, *contents) + + +@dataclasses.dataclass +class ReadBufferChannel(Request): + """ + A ``/b_readChannel`` request. + """ + + buffer_id: SupportsInt + path: PathLike + channel_indices: Optional[Sequence[int]] = None + frame_count: int = -1 + leave_open: bool = False + starting_frame_in_buffer: int = 0 + starting_frame_in_file: int = 0 + on_completion: Optional[Requestable] = None + + def _get_response_patterns(self): + return ["/done", "/b_readChannel", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, int, str]] = [ + int(self.buffer_id), + str(self.path), + int(self.starting_frame_in_file), + int(self.frame_count), + int(self.starting_frame_in_buffer), + int(bool(self.leave_open)), + *(int(channel_index) for channel_index in self.channel_indices or []), + ] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_READ_CHANNEL, *contents) + + +@dataclasses.dataclass +class ReceiveSynthDefs(Request): + """ + A ``/d_recv`` request. + """ + + synthdefs: Sequence[SynthDef] + on_completion: Optional[Requestable] = None + + def to_osc(self) -> OscMessage: + contents = [SynthDefCompiler.compile_synthdefs(self.synthdefs)] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.SYNTHDEF_RECEIVE, *contents) + + +@dataclasses.dataclass +class ReleaseNode(Request): + node_id: SupportsInt + has_gate: bool + + def to_osc(self) -> OscMessage: + if self.has_gate: + return SetNodeControl(node_id=self.node_id, items=[("gate", 0)]).to_osc() + return FreeNode(node_ids=[self.node_id]).to_osc() + + +@dataclasses.dataclass +class RunNode(Request): + """ + A ``/n_run`` request. + """ + + items: Sequence[Tuple[SupportsInt, bool]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items: List[Tuple[SupportsInt, bool]] = [] + for request in requests: + if not isinstance(request, cls): + continue + items.extend(request.items) + return [cls(items=items)] + + def to_osc(self) -> OscMessage: + contents: List[int] = [] + for node_id, flag in self.items: + contents.extend([int(node_id), int(flag)]) + return OscMessage(RequestName.NODE_RUN, *contents) + + +@dataclasses.dataclass +class SetBuffer(Request): + """ + A ``/b_set`` request. + """ + + buffer_id: SupportsInt + items: Sequence[Tuple[int, float]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items_by_buffer_id: Dict[int, List[Tuple[int, float]]] = {} + for request in requests: + if isinstance(request, cls): + items_by_buffer_id.setdefault(int(request.buffer_id), []).extend( + request.items + ) + return [ + cls(buffer_id=buffer_id, items=items) + for buffer_id, items in sorted(items_by_buffer_id.items()) + ] + + def to_osc(self) -> OscMessage: + contents: List[float] = [int(self.buffer_id)] + for index, value in self.items: + contents.extend([int(index), float(value)]) + return OscMessage(RequestName.BUFFER_SET, *contents) + + +@dataclasses.dataclass +class SetBufferRange(Request): + """ + A ``/b_setn`` request. + """ + + buffer_id: SupportsInt + items: Sequence[Tuple[int, Sequence[float]]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items_by_buffer_id: Dict[int, List[Tuple[int, Sequence[float]]]] = {} + for request in requests: + if isinstance(request, cls): + items_by_buffer_id.setdefault(int(request.buffer_id), []).extend( + request.items + ) + return [ + cls(buffer_id=buffer_id, items=items) + for buffer_id, items in sorted(items_by_buffer_id.items()) + ] + + def to_osc(self) -> OscMessage: + contents: List[float] = [int(self.buffer_id)] + for index, values in self.items: + contents.extend( + [int(index), len(values), *(float(value) for value in values)] + ) + return OscMessage(RequestName.BUFFER_SET_CONTIGUOUS, *contents) + + +@dataclasses.dataclass +class SetControlBus(Request): + """ + A ``/c_set`` request. + """ + + items: Sequence[Tuple[SupportsInt, float]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items: List[Tuple[SupportsInt, float]] = [] + for request in requests: + if not isinstance(request, cls): + continue + items.extend(request.items) + return [cls(items=items)] + + def to_osc(self) -> OscMessage: + contents: List[float] = [] + for index, value in self.items: + contents.extend([int(index), float(value)]) + return OscMessage(RequestName.CONTROL_BUS_SET, *contents) + + +@dataclasses.dataclass +class SetControlBusRange(Request): + """ + A ``/c_setn`` request. + """ + + items: Sequence[Tuple[SupportsInt, Sequence[float]]] + + @classmethod + def merge(cls, requests: List["Request"]) -> List["Request"]: + items: List[Tuple[SupportsInt, Sequence[float]]] = [] + for request in requests: + if not isinstance(request, cls): + continue + items.extend(request.items) + return [cls(items=items)] + + def to_osc(self) -> OscMessage: + contents: List[float] = [] + for index, values in self.items: + contents.extend( + [int(index), len(values), *(float(value) for value in values)] + ) + return OscMessage(RequestName.CONTROL_BUS_SET_CONTIGUOUS, *contents) + + +@dataclasses.dataclass +class SetNodeControl(Request): + """ + A ``/n_set`` request. + """ + + node_id: SupportsInt + items: Sequence[Tuple[Union[int, str], float]] + + def to_osc(self) -> OscMessage: + contents: List[Union[float, str]] = [int(self.node_id)] + for control, value in self.items: + contents.extend( + [control if isinstance(control, str) else int(control), float(value)] + ) + return OscMessage(RequestName.NODE_SET, *contents) + + +@dataclasses.dataclass +class SetNodeControlRange(Request): + """ + A ``/n_setn`` request. + """ + + node_id: SupportsInt + items: Sequence[Tuple[Union[int, str], Sequence[float]]] + + def to_osc(self) -> OscMessage: + contents: List[Union[float, str]] = [int(self.node_id)] + for control, values in self.items: + contents.extend( + [ + control if isinstance(control, str) else int(control), + len(values), + *(float(value) for value in values), + ] + ) + return OscMessage(RequestName.NODE_SET_CONTIGUOUS, *contents) + + +@dataclasses.dataclass +class Sync(Request): + """ + A ``/sync`` request. + """ + + sync_id: int + + def _get_response_patterns(self): + return ["/synced", self.sync_id], None + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.SYNC, int(self.sync_id)) + + +@dataclasses.dataclass +class ToggleErrorReporting(Request): + """ + A ``/error`` request. + """ + + code: int + + def to_osc(self) -> OscMessage: + return OscMessage(RequestName.ERROR, int(self.code)) + + +@dataclasses.dataclass +class ToggleNotifications(Request): + """ + A ``/notify`` request. + """ + + should_notify: bool + client_id: Optional[int] = None + + def _get_response_patterns(self): + return ["/done", "/notify"], ["/fail", "/notify"] + + def to_osc(self) -> OscMessage: + contents: List[int] = [int(bool(self.should_notify))] + if self.client_id is not None: + contents.append(int(self.client_id)) + return OscMessage(RequestName.NOTIFY, *contents) + + +@dataclasses.dataclass +class TraceNode(Request): + """ + A ``/n_trace`` request. + """ + + node_ids: Sequence[SupportsInt] + + def to_osc(self) -> OscMessage: + return OscMessage( + RequestName.NODE_TRACE, *(int(node_id) for node_id in self.node_ids) + ) + + +@dataclasses.dataclass +class WriteBuffer(Request): + """ + A ``/b_write`` request. + """ + + buffer_id: SupportsInt + path: PathLike + header_format: HeaderFormatLike + sample_format: SampleFormatLike + frame_count: int = -1 + starting_frame: int = 0 + leave_open: bool = False + on_completion: Optional[Requestable] = None + + def _get_response_patterns(self): + return ["/done", "/b_write", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, int, str]] = [ + int(self.buffer_id), + str(self.path), + HeaderFormat.from_expr(self.header_format).name.lower(), + SampleFormat.from_expr(self.sample_format).name.lower(), + int(self.frame_count), + int(self.starting_frame), + int(bool(self.leave_open)), + ] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_WRITE, *contents) + + +@dataclasses.dataclass +class ZeroBuffer(Request): + """ + A ``/b_zero`` request. + """ + + buffer_id: SupportsInt + on_completion: Optional[Requestable] = None + + def _get_response_patterns(self): + return ["/done", "/b_zero", self.buffer_id], None + + def to_osc(self) -> OscMessage: + contents: List[Union[OscBundle, OscMessage, SupportsInt]] = [ + int(self.buffer_id) + ] + if self.on_completion: + contents.append(self.on_completion.to_osc()) + return OscMessage(RequestName.BUFFER_ZERO, *contents) diff --git a/supriya/contexts/responses.py b/supriya/contexts/responses.py new file mode 100644 index 000000000..ccd26c113 --- /dev/null +++ b/supriya/contexts/responses.py @@ -0,0 +1,414 @@ +import dataclasses +from typing import Dict, List, Optional, Sequence, Tuple, Type, Union + +from ..enums import NodeAction +from ..osc import OscMessage + + +@dataclasses.dataclass +class Response: + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + mapping: Dict[str, Type[Response]] = { + "/b_info": BufferInfo, + "/b_set": GetBufferInfo, + "/b_setn": GetBufferRangeInfo, + "/c_set": GetControlBusInfo, + "/c_setn": GetControlBusRangeInfo, + "/d_removed": SynthDefRemovedInfo, + "/done": DoneInfo, + "/fail": FailInfo, + "/g_queryTree.reply": QueryTreeInfo, + "/n_end": NodeInfo, + "/n_go": NodeInfo, + "/n_info": NodeInfo, + "/n_move": NodeInfo, + "/n_off": NodeInfo, + "/n_on": NodeInfo, + "/n_set": GetNodeControlInfo, + "/n_setn": GetNodeControlRangeInfo, + "/status.reply": StatusInfo, + "/synced": SyncedInfo, + "/tr": TriggerInfo, + } + return mapping[osc_message.address].from_osc(osc_message) + + +@dataclasses.dataclass +class BufferInfo(Response): + """ + A ``/b_info`` response. + """ + + @dataclasses.dataclass + class Item: + buffer_id: int + frame_count: int + channel_count: int + sample_rate: float + + items: Sequence[Item] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + return cls( + items=[ + cls.Item(*osc_message.contents[i : i + 4]) + for i in range(0, len(osc_message.contents), 4) + ] + ) + + +@dataclasses.dataclass +class DoneInfo(Response): + """ + A ``/done`` response. + """ + + command_name: str + other: Sequence[Union[float, str]] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + command_name, *other = osc_message.contents + return cls(command_name=command_name, other=other) + + +@dataclasses.dataclass +class FailInfo(Response): + """ + A ``/fail`` response. + """ + + command_name: str + error: str + other: Sequence[Union[float, str]] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + return cls( + command_name=osc_message.contents[0], + error=osc_message.contents[1], + other=osc_message.contents[2:], + ) + + +@dataclasses.dataclass +class GetBufferInfo(Response): + """ + A ``/b_set`` response. + """ + + buffer_id: int + items: Sequence[Tuple[int, float]] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + buffer_id = osc_message.contents[0] + items: List[Tuple[int, float]] = [] + for i in range(1, len(osc_message.contents), 2): + index, value = osc_message.contents[i : i + 2] + items.append((int(index), float(value))) + return cls(buffer_id=buffer_id, items=items) + + +@dataclasses.dataclass +class GetBufferRangeInfo(Response): + """ + A ``/b_setn`` response. + """ + + buffer_id: int + items: Sequence[Tuple[int, Sequence[float]]] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + buffer_id = osc_message.contents + items: List[Tuple[int, Sequence[float]]] = [] + current_index = 1 + while current_index < len(osc_message.contents): + index, count = osc_message.contents[current_index : current_index + 2] + current_index += 2 + values = osc_message.contents[current_index : current_index + count] + items.append((index, tuple(values))) + current_index += count + return cls(buffer_id=buffer_id, items=items) + + +@dataclasses.dataclass +class GetControlBusInfo(Response): + """ + A ``/c_set`` response. + """ + + items: Sequence[Tuple[int, float]] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + items: List[Tuple[int, float]] = [] + for i in range(0, len(osc_message.contents), 2): + index, value = osc_message.contents[i : i + 2] + items.append((int(index), float(value))) + return cls(items=items) + + +@dataclasses.dataclass +class GetControlBusRangeInfo(Response): + """ + A ``/c_setn`` response. + """ + + items: Sequence[Tuple[int, Sequence[float]]] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + items: List[Tuple[int, Sequence[float]]] = [] + current_index = 0 + while current_index < len(osc_message.contents): + index, count = osc_message.contents[current_index : current_index + 2] + current_index += 2 + values = osc_message.contents[current_index : current_index + count] + items.append((index, tuple(values))) + current_index += count + return cls(items=items) + + +@dataclasses.dataclass +class GetNodeControlInfo(Response): + node_id: int + items: Sequence[Tuple[Union[int, str], float]] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + node_id, *rest = osc_message.contents + items: List[Tuple[Union[int, str], float]] = [] + for i in range(1, len(osc_message.contents), 2): + name_or_index, value = osc_message.contents[i : i + 2] + items.append( + ( + name_or_index + if isinstance(name_or_index, str) + else int(name_or_index), + float(value), + ) + ) + return cls(node_id=node_id, items=items) + + +@dataclasses.dataclass +class GetNodeControlRangeInfo(Response): + node_id: int + items: Sequence[Tuple[Union[int, str], Sequence[float]]] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + node_id, *rest = osc_message.contents + items: List[Tuple[Union[int, str], Sequence[float]]] = [] + current_index = 1 + while current_index < len(osc_message.contents): + name_or_index, count = osc_message.contents[ + current_index : current_index + 2 + ] + current_index += 2 + values = osc_message.contents[current_index : current_index + count] + items.append( + ( + name_or_index + if isinstance(name_or_index, str) + else int(name_or_index), + tuple(values), + ) + ) + current_index += count + return cls(node_id=node_id, items=items) + + +@dataclasses.dataclass +class NodeInfo(Response): + action: NodeAction + node_id: int + parent_id: int + previous_id: int + next_id: int + is_group: bool + head_id: Optional[int] = None + tail_id: Optional[int] = None + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + action = osc_message.address + ( + node_id, + parent_id, + previous_id, + next_id, + is_group, + *rest, + ) = osc_message.contents + if is_group: + head_id: Optional[int] = int(rest[0]) + tail_id: Optional[int] = int(rest[1]) + else: + head_id, tail_id = None, None + return cls( + action=NodeAction.from_expr(action), + node_id=int(node_id), + parent_id=int(parent_id), + previous_id=int(previous_id), + next_id=int(next_id), + is_group=bool(is_group), + head_id=head_id, + tail_id=tail_id, + ) + + +@dataclasses.dataclass +class QueryTreeInfo(Response): + """ + A ``/g_queryTree.reply`` response. + """ + + @dataclasses.dataclass + class Item: + node_id: int + child_count: int + synthdef_name: Optional[str] = None + controls: Optional[Dict[Union[int, str], Union[float, str]]] = None + + node_id: int + child_count: int + items: Sequence[Item] + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + flag = osc_message.contents[0] + node_id = int(osc_message.contents[1]) + child_count = int(osc_message.contents[2]) + items: List["QueryTreeInfo.Item"] = [] + index = 3 + while index < len(osc_message.contents): + child_id = int(osc_message.contents[index]) + child_child_count = int(osc_message.contents[index + 1]) + synthdef_name: Optional[str] = None + controls: Optional[Dict[Union[int, str], Union[float, str]]] = None + index += 2 + if child_child_count < 0: + synthdef_name = osc_message.contents[index] + index += 1 + if flag: + controls = {} + control_count = osc_message.contents[index] + index += 1 + for i in range(control_count): + name_or_index: Union[int, str] = osc_message.contents[index] + value: Union[float, str] = osc_message.contents[index + 1] + controls[name_or_index] = value + index += 2 + items.append( + cls.Item( + node_id=child_id, + child_count=child_child_count, + controls=controls, + synthdef_name=synthdef_name, + ) + ) + return cls(node_id=node_id, child_count=child_count, items=items) + + +@dataclasses.dataclass +class StatusInfo(Response): + """ + A ``/status.reply`` response. + """ + + actual_sample_rate: float + average_cpu_usage: float + group_count: int + peak_cpu_usage: float + synth_count: int + synthdef_count: int + target_sample_rate: float + ugen_count: int + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + ( + _, + ugen_count, + synth_count, + group_count, + synthdef_count, + average_cpu_usage, + peak_cpu_usage, + target_sample_rate, + actual_sample_rate, + ) = osc_message.contents + return cls( + actual_sample_rate=actual_sample_rate, + average_cpu_usage=average_cpu_usage, + group_count=group_count, + peak_cpu_usage=peak_cpu_usage, + synth_count=synth_count, + synthdef_count=synthdef_count, + target_sample_rate=target_sample_rate, + ugen_count=ugen_count, + ) + + +@dataclasses.dataclass +class SyncedInfo(Response): + """ + A ``/synced`` response. + """ + + sync_id: int + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + return cls(*osc_message.contents) + + +@dataclasses.dataclass +class SynthDefRemovedInfo(Response): + """ + A ``/d_removed`` response. + """ + + name: str + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + return cls(*osc_message.contents) + + +@dataclasses.dataclass +class TriggerInfo(Response): + """ + A ``/tr`` response. + """ + + node_id: int + trigger_id: int + value: float + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + return cls(*osc_message.contents) + + +@dataclasses.dataclass +class VersionInfo(Response): + """ + A ``/version.reply`` response. + """ + + program_name: str + major: int + minor: int + patch: str + branch: str + commit: str + + @classmethod + def from_osc(cls, osc_message: OscMessage) -> "Response": + return cls(*osc_message.contents) diff --git a/supriya/enums.py b/supriya/enums.py index 5e8ae4ee6..d35e67f14 100644 --- a/supriya/enums.py +++ b/supriya/enums.py @@ -373,7 +373,6 @@ class RequestId(IntEnumeration): SYNTH_NEW = 9 SYNTH_NEWARGS = 59 SYNTH_NOID = 49 - SYNTH_QUERY = 65 UGEN_COMMAND = 20 VERSION = 64 @@ -436,21 +435,20 @@ class RequestName(StrictEnumeration): NODE_SET = "/n_set" NODE_SET_CONTIGUOUS = "/n_setn" NODE_TRACE = "/n_trace" - # NOTHING = None + NOTHING = None NOTIFY = "/notify" PARALLEL_GROUP_NEW = "/p_new" QUIT = "/quit" STATUS = "/status" SYNC = "/sync" SYNTHDEF_FREE = "/d_free" - # SYNTHDEF_FREE_ALL = None + SYNTHDEF_FREE_ALL = "/d_freeAll" SYNTHDEF_LOAD = "/d_load" SYNTHDEF_LOAD_DIR = "/d_loadDir" SYNTHDEF_RECEIVE = "/d_recv" SYNTH_GET = "/s_get" SYNTH_GET_CONTIGUOUS = "/s_getn" SYNTH_NEW = "/s_new" - SYNTH_QUERY = "/s_query" # SYNTH_NEWARGS = None SYNTH_NOID = "/s_noid" UGEN_COMMAND = "/u_cmd" diff --git a/supriya/querytree.py b/supriya/querytree.py index 82434929a..a1fc9407b 100644 --- a/supriya/querytree.py +++ b/supriya/querytree.py @@ -1,14 +1,15 @@ +from collections import deque from collections.abc import Sequence +from typing import TYPE_CHECKING, Deque, List, Union from supriya import ParameterRate from supriya.system import SupriyaValueObject +if TYPE_CHECKING: + from .contexts.responses import QueryTreeInfo -class QueryTreeControl(SupriyaValueObject): - ### CLASS VARIABLES ### - - __slots__ = ("_control_value", "_control_name_or_index") +class QueryTreeControl(SupriyaValueObject): ### INITIALIZER ### def __init__(self, control_name_or_index=None, control_value=None): @@ -52,10 +53,6 @@ def control_value(self): class QueryTreeSynth(SupriyaValueObject, Sequence): - ### CLASS VARIABLES ### - - __slots__ = ("_controls", "_extra", "_name", "_node_id", "_synthdef_name") - ### INITIALIZER ### def __init__( @@ -278,10 +275,6 @@ def synthdef_name(self): class QueryTreeGroup(SupriyaValueObject, Sequence): - ### CLASS VARIABLES ### - - __slots__ = ("_children", "_extra", "_name", "_node_id") - ### INITIALIZER ### def __init__(self, node_id=None, children=None, name=None, **extra): @@ -406,6 +399,39 @@ def from_group(cls, group, include_controls=False): query_tree_group = QueryTreeGroup(node_id=node_id, children=children) return query_tree_group + @classmethod + def from_query_tree_info( + cls, response: "QueryTreeInfo" + ) -> Union["QueryTreeGroup", "QueryTreeSynth"]: + from .contexts.responses import QueryTreeInfo + + def recurse( + item: QueryTreeInfo.Item, items: Deque[QueryTreeInfo.Item] + ) -> Union[QueryTreeGroup, QueryTreeSynth]: + print(item) + if item.child_count < 0: + return QueryTreeSynth( + node_id=item.node_id, + synthdef_name=item.synthdef_name, + controls=[ + QueryTreeControl( + control_name_or_index=name_or_index, control_value=value + ) + for name_or_index, value in (item.controls or {}).items() + ], + ) + children: List[Union[QueryTreeGroup, QueryTreeSynth]] = [] + for _ in range(item.child_count): + children.append(recurse(items.popleft(), items)) + return QueryTreeGroup(node_id=item.node_id, children=children) + + return recurse( + QueryTreeInfo.Item( + node_id=response.node_id, child_count=response.child_count + ), + deque(response.items), + ) + @classmethod def from_response(cls, response): return cls(node_id=response.node_id) diff --git a/supriya/typing.py b/supriya/typing.py index 33f3e462f..b8f008d10 100644 --- a/supriya/typing.py +++ b/supriya/typing.py @@ -1,6 +1,7 @@ from os import PathLike from pathlib import Path from typing import ( + TYPE_CHECKING, Callable, Coroutine, Dict, @@ -18,6 +19,9 @@ from .enums import AddAction, CalculationRate, HeaderFormat, SampleFormat +if TYPE_CHECKING: + from .osc import OscBundle, OscMessage + class Default: pass @@ -27,6 +31,11 @@ class Missing: pass +class SupportsOsc(Protocol): + def to_osc(self) -> Union["OscBundle", "OscMessage"]: + ... + + class SupportsRender(Protocol): def __render__( self, diff --git a/tests/contexts/conftest.py b/tests/contexts/conftest.py new file mode 100644 index 000000000..126da1459 --- /dev/null +++ b/tests/contexts/conftest.py @@ -0,0 +1,8 @@ +import logging + +import pytest + + +@pytest.fixture(autouse=True) +def capture_logs(caplog): + caplog.set_level(logging.INFO, logger="supriya") diff --git a/tests/contexts/test_RealtimeContext_buffers.py b/tests/contexts/test_RealtimeContext_buffers.py index 2811779b4..0de1dae25 100644 --- a/tests/contexts/test_RealtimeContext_buffers.py +++ b/tests/contexts/test_RealtimeContext_buffers.py @@ -7,10 +7,10 @@ import pytest_asyncio from supriya import assets, default -from supriya.commands import BufferInfo from supriya.contexts.core import MomentClosed from supriya.contexts.realtime import AsyncServer, Server from supriya.osc import OscBundle, OscMessage +from supriya.contexts.responses import BufferInfo async def get(x): @@ -250,14 +250,14 @@ async def test_query_buffer(context): buffer = context.add_buffer(channel_count=channel_count, frame_count=frame_count) await asyncio.sleep(0.1) assert await get(buffer.query()) == BufferInfo( - items=( + items=[ BufferInfo.Item( buffer_id=0, frame_count=frame_count, channel_count=channel_count, sample_rate=44100.0, - ), - ) + ) + ] ) diff --git a/tests/contexts/test_RealtimeContext_nodes.py b/tests/contexts/test_RealtimeContext_nodes.py index c16e7da4f..1f723122d 100644 --- a/tests/contexts/test_RealtimeContext_nodes.py +++ b/tests/contexts/test_RealtimeContext_nodes.py @@ -6,10 +6,10 @@ from uqbar.strings import normalize from supriya import default -from supriya.commands import NodeInfo from supriya.contexts.realtime import AsyncServer, Server from supriya.enums import NodeAction from supriya.osc import OscBundle, OscMessage +from supriya.contexts.responses import NodeInfo async def get(x): @@ -344,7 +344,14 @@ async def test_query_node(context): group = context.add_group() await asyncio.sleep(0.1) assert await get(group.query()) == NodeInfo( - action=NodeAction.NODE_QUERIED, is_group=True, node_id=1000, parent_id=1 + action=NodeAction.NODE_QUERIED, + head_id=-1, + is_group=True, + next_id=-1, + node_id=1000, + parent_id=1, + previous_id=-1, + tail_id=-1, )