-
Notifications
You must be signed in to change notification settings - Fork 2
/
locustfile.py
140 lines (123 loc) · 5.54 KB
/
locustfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
'''
Copyright 2023
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
from flask.globals import request
from locust import HttpUser, TaskSet, task, between
from config import Config
from random import randrange
import logging
import time
import warnings
warnings.filterwarnings("ignore")
ORGANISATION_ID = '1'
LABEL_ID = 1
class TagBatch(TaskSet):
'''
Batch-tagging class for locust. Emulates a user tagging by taking a job from a randomly assigned active task, recieves clusters,
and starts assigning the specified label to each.
'''
wait_time = between(1,2)
host = 'https://'+Config.DNS
def on_start(self):
'''Initiales the tagging of a batch be getting a job, and initialises all variables.'''
self.username = self.client.get('/get_username').json()
reply = self.client.get('/get_available_task/'+ORGANISATION_ID).json()
if reply == 'inactive':
logging.info('{}: No tasks launched. Stopping.'.format(self.username))
# self.environment.runner.quit()
time.sleep(36000)
else:
logging.info('Received task {}'.format(reply))
self.task_id = reply
self.label_id = LABEL_ID
self.client.verify = False
reply = self.client.get('/takeJob/'+str(self.task_id), name="/takeJob").json()
if reply['status'] == 'success':
self.client.get(reply['code'], name="/dotask")
self.clusters = []
self.clusterIdList = []
self.clusterIndex = 0
self.call_count = 0
self.awaiting_clusters = False
self.ping()
self.update_clusters()
elif reply['status'] == 'inactive':
logging.info('{}: Task {} finished. Stopping.'.format(self.username,self.task_id))
# self.environment.runner.quit()
time.sleep(36000)
else:
logging.info('{}: No job available for task {}. Sleeping.'.format(self.username,self.task_id))
time.sleep(30)
self.interrupt()
def update_clusters(self):
'''Requests more clusters from the server if the user is getting close to the end of their queue.'''
required = 5-(len(self.clusters)-(self.clusterIndex+1))
if (required > 0) and (not self.awaiting_clusters):
self.awaiting_clusters = True
reply = self.client.get('/getCluster?task=0&reqId='+str(randrange(100000)), name="/getCluster")
try:
reply = reply.json()
if 'info' in reply.keys():
newClusters = reply['info']
for newCluster in newClusters:
if (newCluster['id'] not in self.clusterIdList) or (str(newCluster['id']) == '-101'):
self.clusterIdList.append(newCluster['id'])
self.clusters.append(newCluster)
self.awaiting_clusters = False
elif 'redirect' in reply.keys():
logging.info('{}: Batch done!'.format(self.username))
self.client.get(reply['redirect'])
self.interrupt()
else:
logging.info('{}: Unexpected /getCluster response: {}'.format(self.username,reply))
self.awaiting_clusters = False
except:
logging.info('{}: Unexpected /getCluster response: {}'.format(self.username,reply))
self.awaiting_clusters = False
def ping(self):
'''Pings the server to let it know the user is active.'''
self.client.post('/ping')
@task()
def label_cluster(self):
'''Labels the next cluster. Updates the cluster queue and pings the server every now and then.'''
self.call_count += 1
if (self.call_count % 3) == 0:
self.update_clusters()
if (self.call_count % 20) == 0:
self.ping()
if self.clusterIndex < len(self.clusters):
if self.clusters[self.clusterIndex]['id'] == '-99':
logging.info('{}: -99 encountered.'.format(self.username))
elif self.clusters[self.clusterIndex]['id'] == '-101':
logging.info('{}: Batch done!'.format(self.username))
self.client.get('/done')
self.interrupt()
else:
self.client.post(
"/assignLabel/" + str(self.clusters[self.clusterIndex]['id']),
data={"labels": str([self.label_id])},
name="/assignLabel"
)
self.clusterIndex += 1
else:
logging.info('{}: Failed to assign label: no clusters available.'.format(self.username))
class Tagger(HttpUser):
'''The user class emulatting somebody tagging clusters.'''
wait_time = between(15,30)
host = 'https://'+Config.DNS
def on_start(self):
'''Requests a user, and logs in.'''
self.client.verify = False
self.client.post('/load_login/'+ORGANISATION_ID)
tasks = {
TagBatch: 1
}