-
Notifications
You must be signed in to change notification settings - Fork 13
/
client.py
93 lines (76 loc) · 3.95 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import asyncio
import time
from pydicom import Dataset
from scp import ModalityStoreSCP
class SeriesCollector:
"""A Series Collector is used to build up a list of instances (a DICOM series) as they are received by the modality.
It stores the (during collection incomplete) series, the Series (Instance) UID, the time the series was last updated
with a new instance and the information whether the dispatch of the series was started.
"""
def __init__(self, first_dataset: Dataset) -> None:
"""Initialization of the Series Collector with the first dataset (instance).
Args:
first_dataset (Dataset): The first dataset or the regarding series received from the modality.
"""
self.series_instance_uid = first_dataset.SeriesInstanceUID
self.series: list[Dataset] = [first_dataset]
self.last_update_time = time.time()
self.dispatch_started = False
def add_instance(self, dataset: Dataset) -> bool:
"""Add an dataset to the series collected by this Series Collector if it has the correct Series UID.
Args:
dataset (Dataset): The dataset to add.
Returns:
bool: `True`, if the Series UID of the dataset to add matched and the dataset was therefore added, `False` otherwise.
"""
if self.series_instance_uid == dataset.SeriesInstanceUID:
self.series.append(dataset)
self.last_update_time = time.time()
return True
return False
class SeriesDispatcher:
"""This code provides a template for receiving data from a modality using DICOM.
Be sure to understand how it works, then try to collect incoming series (hint: there is no attribute indicating how
many instances are in a series, so you have to wait for some time to find out if a new instance is transmitted).
For simplyfication, you can assume that only one series is transmitted at a time.
You can use the given template, but you don't have to!
"""
def __init__(self) -> None:
"""Initialize the Series Dispatcher.
"""
self.loop: asyncio.AbstractEventLoop
self.modality_scp = ModalityStoreSCP()
self.series_collector = None
async def main(self) -> None:
"""An infinitely running method used as hook for the asyncio event loop.
Keeps the event loop alive whether or not datasets are received from the modality and prints a message
regulary when no datasets are received.
"""
while True:
# TODO: Regulary check if new datasets are received and act if they are.
# Information about Python asyncio: https://docs.python.org/3/library/asyncio.html
# When datasets are received you should collect and process them
# (e.g. using `asyncio.create_task(self.run_series_collector()`)
print("Waiting for Modality")
await asyncio.sleep(0.2)
async def run_series_collectors(self) -> None:
"""Runs the collection of datasets, which results in the Series Collector being filled.
"""
# TODO: Get the data from the SCP and start dispatching
pass
async def dispatch_series_collector(self) -> None:
"""Tries to dispatch a Series Collector, i.e. to finish it's dataset collection and scheduling of further
methods to extract the desired information.
"""
# Check if the series collector hasn't had an update for a long enough timespan and send the series to the
# server if it is complete
# NOTE: This is the last given function, you should create more for extracting the information and
# sending the data to the server
maximum_wait_time = 1
pass
if __name__ == "__main__":
"""Create a Series Dispatcher object and run it's infinite `main()` method in a event loop.
"""
engine = SeriesDispatcher()
engine.loop = asyncio.get_event_loop()
engine.loop.run_until_complete(engine.main())