-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathtest_datasets.py
177 lines (157 loc) · 8.59 KB
/
test_datasets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import textwrap
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
skip_without_tool,
)
from ._framework import ApiTestCase
class DatasetsApiTestCase(ApiTestCase):
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
self.history_id = self.dataset_populator.new_history()
def test_index(self):
index_response = self._get("datasets")
self._assert_status_code_is(index_response, 200)
def test_search_datasets(self):
hda_id = self.dataset_populator.new_dataset(self.history_id)['id']
payload = {'limit': 1, 'offset': 0}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 1
assert index_response[0]['id'] == hda_id
hdca_id = self.dataset_collection_populator.create_list_in_history(self.history_id,
contents=["1\n2\n3"]).json()['id']
payload = {'limit': 3, 'offset': 0}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 3
assert index_response[0]['id'] == hdca_id
assert index_response[0]['history_content_type'] == 'dataset_collection'
assert index_response[2]['id'] == hda_id
assert index_response[2]['history_content_type'] == 'dataset'
payload = {'limit': 2, 'offset': 0, 'q': ['history_content_type'], 'qv': ['dataset']}
index_response = self._get("datasets", payload).json()
assert index_response[1]['id'] == hda_id
def test_search_by_tag(self):
hda_id = self.dataset_populator.new_dataset(self.history_id)['id']
update_payload = {
'tags': ['cool:new_tag', 'cool:another_tag'],
}
updated_hda = self._put(
f"histories/{self.history_id}/contents/{hda_id}",
update_payload).json()
assert 'cool:new_tag' in updated_hda['tags']
assert 'cool:another_tag' in updated_hda['tags']
payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type', 'tag'], 'qv': ['dataset', 'cool:new_tag']}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 1
payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type', 'tag-contains'],
'qv': ['dataset', 'new_tag']}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 1
payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type', 'tag-contains'], 'qv': ['dataset', 'notag']}
index_response = self._get("datasets", payload).json()
assert len(index_response) == 0
def test_search_by_tool_id(self):
self.dataset_populator.new_dataset(self.history_id)
payload = {'limit': 1, 'offset': 0, 'q': ['history_content_type', 'tool_id'], 'qv': ['dataset', 'upload1']}
assert len(self._get("datasets", payload).json()) == 1
payload = {'limit': 1, 'offset': 0, 'q': ['history_content_type', 'tool_id'], 'qv': ['dataset', 'uploadX']}
assert len(self._get("datasets", payload).json()) == 0
payload = {'limit': 1, 'offset': 0, 'q': ['history_content_type', 'tool_id-contains'], 'qv': ['dataset', 'pload1']}
assert len(self._get("datasets", payload).json()) == 1
self.dataset_collection_populator.create_list_in_history(self.history_id,
name="search by tool id",
contents=["1\n2\n3"]).json()
self.dataset_populator.wait_for_history(self.history_id)
payload = {'limit': 10, 'offset': 0, 'history_id': self.history_id, 'q': ['name', 'tool_id'],
'qv': ['search by tool id', 'upload1']}
result = self._get("datasets", payload).json()
assert result[0]['name'] == 'search by tool id', result
payload = {'limit': 1, 'offset': 0, 'q': ['history_content_type', 'tool_id'],
'qv': ['dataset_collection', 'uploadX']}
result = self._get("datasets", payload).json()
assert len(result) == 0
def test_invalid_search(self):
payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type', 'tag-invalid_op'], 'qv': ['dataset', 'notag']}
index_response = self._get("datasets", payload)
self._assert_status_code_is(index_response, 400)
assert index_response.json()['err_msg'] == 'bad op in filter'
def test_search_returns_only_accessible(self):
hda_id = self.dataset_populator.new_dataset(self.history_id)['id']
with self._different_user():
payload = {'limit': 10, 'offset': 0, 'q': ['history_content_type'], 'qv': ['dataset']}
index_response = self._get("datasets", payload).json()
for item in index_response:
assert hda_id != item['id']
def test_show(self):
hda1 = self.dataset_populator.new_dataset(self.history_id)
show_response = self._get("datasets/%s" % (hda1["id"]))
self._assert_status_code_is(show_response, 200)
self.__assert_matches_hda(hda1, show_response.json())
def __assert_matches_hda(self, input_hda, query_hda):
self._assert_has_keys(query_hda, "id", "name")
assert input_hda["name"] == query_hda["name"]
assert input_hda["id"] == query_hda["id"]
def test_display(self):
contents = textwrap.dedent("""\
1 2 3 4
A B C D
10 20 30 40
""")
hda1 = self.dataset_populator.new_dataset(self.history_id, content=contents)
self.dataset_populator.wait_for_history(self.history_id)
display_response = self._get("histories/{}/contents/{}/display".format(self.history_id, hda1["id"]), {
'raw': 'True'
})
self._assert_status_code_is(display_response, 200)
assert display_response.text == contents
def test_tag_change(self):
hda_id = self.dataset_populator.new_dataset(self.history_id)['id']
payload = {
'item_id': hda_id,
'item_class': 'HistoryDatasetAssociation',
'item_tags': ['cool:tag_a', 'cool:tag_b', 'tag_c', 'name:tag_d', '#tag_e'],
}
self._put("tags", payload).json()
updated_hda = self._get(
f"histories/{self.history_id}/contents/{hda_id}").json()
assert 'cool:tag_a' in updated_hda['tags']
assert 'cool:tag_b' in updated_hda['tags']
assert 'tag_c' in updated_hda['tags']
assert 'name:tag_d' in updated_hda['tags']
assert 'name:tag_e' in updated_hda['tags']
@skip_without_tool("cat_data_and_sleep")
def test_update_datatype(self):
hda_id = self.dataset_populator.new_dataset(self.history_id)['id']
original_hda = self._get(
f"histories/{self.history_id}/contents/{hda_id}").json()
assert original_hda['extension'] == 'txt'
assert original_hda['data_type'] == 'galaxy.datatypes.data.Text'
assert 'scatterplot' not in [viz['name'] for viz in original_hda['visualizations']]
inputs = {
'input1': {'src': 'hda', 'id': hda_id},
'sleep_time': 10,
}
run_response = self.dataset_populator.run_tool(
"cat_data_and_sleep",
inputs,
self.history_id,
assert_ok=False,
)
queued_id = run_response.json()["outputs"][0]["id"]
update_while_incomplete_response = self._put( # try updating datatype while used as output of a running job
f"histories/{self.history_id}/contents/{queued_id}",
{'datatype': 'tabular'})
self._assert_status_code_is(update_while_incomplete_response, 400)
self.dataset_populator.wait_for_history_jobs(self.history_id) # now wait for upload to complete
successful_updated_hda_response = self._put(
f"histories/{self.history_id}/contents/{hda_id}",
{'datatype': 'tabular'}).json()
assert successful_updated_hda_response['extension'] == 'tabular'
assert successful_updated_hda_response['data_type'] == 'galaxy.datatypes.tabular.Tabular'
assert 'scatterplot' in [viz['name'] for viz in successful_updated_hda_response['visualizations']]
invalidly_updated_hda_response = self._put( # try updating with invalid datatype
f"histories/{self.history_id}/contents/{hda_id}",
{'datatype': 'invalid'})
self._assert_status_code_is(invalidly_updated_hda_response, 400)