-
Notifications
You must be signed in to change notification settings - Fork 23
/
default.py
1365 lines (1077 loc) · 54.3 KB
/
default.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# pylint: disable=protected-access
__copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu"
__license__ = "MIT"
import os
import copy
import math
import time
import pprint
import shutil
import tempfile
import radical.saga as rs
import radical.saga.filesystem as rsfs
import radical.utils as ru
from ... import states as rps
from ... import constants as rpc
from .base import PMGRLaunchingComponent
from ...staging_directives import complete_url
# ------------------------------------------------------------------------------
# local constants
DEFAULT_AGENT_SPAWNER = 'POPEN'
DEFAULT_RP_VERSION = 'local'
DEFAULT_VIRTENV_MODE = 'update'
DEFAULT_VIRTENV_DIST = 'default'
DEFAULT_AGENT_CONFIG = 'default'
JOB_CANCEL_DELAY = 120 # seconds between cancel signal and job kill
JOB_CHECK_INTERVAL = 60 # seconds between runs of the job state check loop
JOB_CHECK_MAX_MISSES = 3 # number of times to find a job missing before
# declaring it dead
LOCAL_SCHEME = 'file'
BOOTSTRAPPER_0 = "bootstrap_0.sh"
# ------------------------------------------------------------------------------
#
class Default(PMGRLaunchingComponent):
# --------------------------------------------------------------------------
#
def __init__(self, cfg, session):
PMGRLaunchingComponent.__init__(self, cfg, session)
# --------------------------------------------------------------------------
#
def initialize(self):
# we don't really have an output queue, as we pass control over the
# pilot jobs to the resource management system (ResourceManager).
self._pilots = dict() # dict for all known pilots
self._pilots_lock = ru.RLock() # lock on maipulating the above
self._checking = list() # pilots to check state on
self._check_lock = ru.RLock() # lock on maipulating the above
self._saga_fs_cache = dict() # cache of saga directories
self._saga_js_cache = dict() # cache of saga job services
self._sandboxes = dict() # cache of resource sandbox URLs
self._cache_lock = ru.RLock() # lock for cache
self._mod_dir = os.path.dirname(os.path.abspath(__file__))
self._root_dir = "%s/../../" % self._mod_dir
self.register_input(rps.PMGR_LAUNCHING_PENDING,
rpc.PMGR_LAUNCHING_QUEUE, self.work)
# FIXME: make interval configurable
self.register_timed_cb(self._pilot_watcher_cb, timer=10.0)
# we listen for pilot cancel and input staging commands
self.register_subscriber(rpc.CONTROL_PUBSUB, self._pmgr_control_cb)
self._log.info(ru.get_version([self._mod_dir, self._root_dir]))
self._rp_version, _, _, _, self._rp_sdist_name, self._rp_sdist_path = \
ru.get_version([self._mod_dir, self._root_dir])
# --------------------------------------------------------------------------
#
def finalize(self):
try:
self.unregister_timed_cb(self._pilot_watcher_cb)
self.unregister_input(rps.PMGR_LAUNCHING_PENDING,
rpc.PMGR_LAUNCHING_QUEUE, self.work)
# FIXME: always kill all saga jobs for non-final pilots at termination,
# and set the pilot states to CANCELED. This will conflict with
# disconnect/reconnect semantics.
with self._pilots_lock:
pids = list(self._pilots.keys())
self._cancel_pilots(pids)
self._kill_pilots(pids)
with self._cache_lock:
for url,js in self._saga_js_cache.items():
self._log.debug('close js %s', url)
js.close()
except:
self._log.exception('finalization error')
# --------------------------------------------------------------------------
#
def _pmgr_control_cb(self, topic, msg):
cmd = msg['cmd']
arg = msg['arg']
self._log.debug('launcher got %s', msg)
if cmd == 'pilot_staging_input_request':
self._handle_pilot_input_staging(arg['pilot'], arg['sds'])
if cmd == 'pilot_staging_output_request':
self._handle_pilot_output_staging(arg['pilot'], arg['sds'])
elif cmd == 'cancel_pilots':
# on cancel_pilot requests, we forward the DB entries via MongoDB,
# by pushing a pilot update. We also mark the pilot for
# cancelation, so that the pilot watcher can cancel the job after
# JOB_CANCEL_DELAY seconds, in case the pilot did not react on the
# command in time.
pmgr = arg['pmgr']
pids = arg['uids']
if pmgr != self._pmgr:
# this request is not for us to enact
return True
if not isinstance(pids, list):
pids = [pids]
self._log.info('received pilot_cancel command (%s)', pids)
self._cancel_pilots(pids)
return True
# --------------------------------------------------------------------------
#
def _handle_pilot_input_staging(self, pilot, sds):
pid = pilot['uid']
# NOTE: no unit sandboxes defined!
src_context = {'pwd' : pilot['client_sandbox'],
'pilot' : pilot['pilot_sandbox'],
'resource': pilot['resource_sandbox']}
tgt_context = {'pwd' : pilot['pilot_sandbox'],
'pilot' : pilot['pilot_sandbox'],
'resource': pilot['resource_sandbox']}
# Iterate over all directives
for sd in sds:
# TODO: respect flags in directive
action = sd['action']
flags = sd['flags']
did = sd['uid']
src = sd['source']
tgt = sd['target']
assert(action in [rpc.COPY, rpc.LINK, rpc.MOVE, rpc.TRANSFER])
self._prof.prof('staging_in_start', uid=pid, msg=did)
src = complete_url(src, src_context, self._log)
tgt = complete_url(tgt, tgt_context, self._log)
if action in [rpc.COPY, rpc.LINK, rpc.MOVE]:
self._prof.prof('staging_in_fail', uid=pid, msg=did)
raise ValueError("invalid action '%s' on pilot level" % action)
self._log.info('transfer %s to %s', src, tgt)
# FIXME: make sure that tgt URL points to the right resource
# FIXME: honor sd flags if given (recursive...)
flags = rsfs.CREATE_PARENTS
if os.path.isdir(src.path):
flags |= rsfs.RECURSIVE
# Define and open the staging directory for the pilot
# We use the target dir construct here, so that we can create
# the directory if it does not yet exist.
# url used for cache (sandbox url w/o path)
fs_url = rs.Url(pilot['pilot_sandbox'])
fs_url.path = '/'
key = str(fs_url)
self._log.debug("rs.file.Directory ('%s')", key)
with self._cache_lock:
if key in self._saga_fs_cache:
fs = self._saga_fs_cache[key]
else:
fs = rsfs.Directory(fs_url, session=self._session)
self._saga_fs_cache[key] = fs
fs.copy(src, tgt, flags=flags)
sd['state'] = rps.DONE
self._prof.prof('staging_in_stop', uid=pid, msg=did)
self.publish(rpc.CONTROL_PUBSUB, {'cmd': 'pilot_staging_input_result',
'arg': {'pilot': pilot,
'sds' : sds}})
# --------------------------------------------------------------------------
#
def _handle_pilot_output_staging(self, pilot, sds):
pid = pilot['uid']
# NOTE: no unit sandboxes defined!
src_context = {'pwd' : pilot['pilot_sandbox'],
'pilot' : pilot['pilot_sandbox'],
'resource': pilot['resource_sandbox']}
tgt_context = {'pwd' : pilot['client_sandbox'],
'pilot' : pilot['pilot_sandbox'],
'resource': pilot['resource_sandbox']}
# Iterate over all directives
for sd in sds:
try:
action = sd['action']
flags = sd['flags']
did = sd['uid']
src = sd['source']
tgt = sd['target']
assert(action in [rpc.COPY, rpc.LINK, rpc.MOVE, rpc.TRANSFER])
self._prof.prof('staging_out_start', uid=pid, msg=did)
if action in [rpc.COPY, rpc.LINK, rpc.MOVE]:
raise ValueError("invalid pilot action '%s'" % action)
src = complete_url(src, src_context, self._log)
tgt = complete_url(tgt, tgt_context, self._log)
self._log.info('transfer %s to %s', src, tgt)
# FIXME: make sure that tgt URL points to the right resource
# FIXME: honor sd flags if given (recursive...)
flags = rsfs.CREATE_PARENTS
if os.path.isdir(src.path):
flags |= rsfs.RECURSIVE
# Define and open the staging directory for the pilot
# url used for cache (sandbox url w/o path)
fs_url = rs.Url(pilot['pilot_sandbox'])
fs_url.path = '/'
key = str(fs_url)
with self._cache_lock:
if key in self._saga_fs_cache:
fs = self._saga_fs_cache[key]
else:
fs = rsfs.Directory(fs_url, session=self._session)
self._saga_fs_cache[key] = fs
fs.copy(src, tgt, flags=flags)
sd['state'] = rps.DONE
self._prof.prof('staging_out_stop', uid=pid, msg=did)
except:
self._log.exception('pilot level staging failed')
self._prof.prof('staging_out_fail', uid=pid, msg=did)
sd['state'] = rps.FAILED
self.publish(rpc.CONTROL_PUBSUB,
{'cmd': 'pilot_staging_output_result',
'arg': {'pilot': pilot,
'sds' : [sd]}})
# --------------------------------------------------------------------------
#
def _pilot_watcher_cb(self):
# FIXME: we should actually use SAGA job state notifications!
# FIXME: check how race conditions are handles: we may detect
# a finalized SAGA job and change the pilot state -- but that
# pilot may have transitioned into final state via the normal
# notification mechanism already. That probably should be sorted
# out by the pilot manager, which will receive notifications for
# both transitions. As long as the final state is the same,
# there should be no problem anyway. If it differs, the
# 'cleaner' final state should prevail, in this ordering:
# cancel
# timeout
# error
# disappeared
# This implies that we want to communicate 'final_cause'
# we don't want to lock our members all the time. For that reason we
# use a copy of the pilots_tocheck list and iterate over that, and only
# lock other members when they are manipulated.
tc = rs.job.Container()
with self._pilots_lock, self._check_lock:
for pid in self._checking:
tc.add(self._pilots[pid]['job'])
states = tc.get_states()
self._log.debug('bulk states: %s', states)
# if none of the states is final, we have nothing to do.
# We can't rely on the ordering of tasks and states in the task
# container, so we hope that the task container's bulk state query lead
# to a caching of state information, and we thus have cache hits when
# querying the pilots individually
final_pilots = list()
with self._pilots_lock, self._check_lock:
for pid in self._checking:
state = self._pilots[pid]['job'].state
self._log.debug('saga job state: %s %s %s', pid, self._pilots[pid]['job'], state)
if state in [rs.job.DONE, rs.job.FAILED, rs.job.CANCELED]:
pilot = self._pilots[pid]['pilot']
if state == rs.job.DONE : pilot['state'] = rps.DONE
if state == rs.job.FAILED : pilot['state'] = rps.FAILED
if state == rs.job.CANCELED: pilot['state'] = rps.CANCELED
final_pilots.append(pilot)
if final_pilots:
for pilot in final_pilots:
with self._check_lock:
# stop monitoring this pilot
self._checking.remove(pilot['uid'])
self._log.debug('final pilot %s %s', pilot['uid'], pilot['state'])
self.advance(final_pilots, push=False, publish=True)
# all checks are done, final pilots are weeded out. Now check if any
# pilot is scheduled for cancellation and is overdue, and kill it
# forcefully.
to_cancel = list()
with self._pilots_lock:
for pid in self._pilots:
pilot = self._pilots[pid]['pilot']
time_cr = pilot.get('cancel_requested')
# check if the pilot is final meanwhile
if pilot['state'] in rps.FINAL:
continue
if time_cr and time_cr + JOB_CANCEL_DELAY < time.time():
self._log.debug('pilot needs killing: %s : %s + %s < %s',
pid, time_cr, JOB_CANCEL_DELAY, time.time())
del(pilot['cancel_requested'])
self._log.debug(' cancel pilot %s', pid)
to_cancel.append(pid)
if to_cancel:
self._kill_pilots(to_cancel)
return True
# --------------------------------------------------------------------------
#
def _cancel_pilots(self, pids):
'''
Send a cancellation request to the pilots. This call will not wait for
the request to get enacted, nor for it to arrive, but just send it.
'''
if not pids or not self._pilots:
# nothing to do
return
# recod time of request, so that forceful termination can happen
# after a certain delay
now = time.time()
with self._pilots_lock:
for pid in pids:
if pid in self._pilots:
self._log.debug('update cancel req: %s %s', pid, now)
self._pilots[pid]['pilot']['cancel_requested'] = now
# --------------------------------------------------------------------------
#
def _kill_pilots(self, pids):
'''
Forcefully kill a set of pilots. For pilots which have just recently be
cancelled, we will wait a certain amount of time to give them a chance
to termimate on their own (which allows to flush profiles and logfiles,
etc). After that delay, we'll make sure they get killed.
'''
self._log.debug('killing pilots: %s', pids)
if not pids or not self._pilots:
# nothing to do
return
# find the most recent cancellation request
with self._pilots_lock:
self._log.debug('killing pilots: %s',
[p['pilot'].get('cancel_requested', 0)
for p in list(self._pilots.values())])
last_cancel = max([p['pilot'].get('cancel_requested', 0)
for p in list(self._pilots.values())])
self._log.debug('killing pilots: last cancel: %s', last_cancel)
# we wait for up to JOB_CANCEL_DELAY for a pilt
while time.time() < (last_cancel + JOB_CANCEL_DELAY):
self._log.debug('killing pilots: check %s < %s + %s',
time.time(), last_cancel, JOB_CANCEL_DELAY)
alive_pids = list()
for pid in pids:
if pid not in self._pilots:
self._log.error('unknown: %s', pid)
raise ValueError('unknown pilot %s' % pid)
pilot = self._pilots[pid]['pilot']
if pilot['state'] not in rps.FINAL:
self._log.debug('killing pilots: alive %s', pid)
alive_pids.append(pid)
else:
self._log.debug('killing pilots: dead %s', pid)
pids = alive_pids
if not alive_pids:
# nothing to do anymore
return
# avoid busy poll)
time.sleep(1)
to_advance = list()
# we don't want the watcher checking for these pilot anymore
with self._check_lock:
for pid in pids:
if pid in self._checking:
self._checking.remove(pid)
self._log.debug('killing pilots: kill! %s', pids)
try:
with self._pilots_lock:
tc = rs.job.Container()
for pid in pids:
if pid not in self._pilots:
self._log.error('unknown: %s', pid)
raise ValueError('unknown pilot %s' % pid)
pilot = self._pilots[pid]['pilot']
job = self._pilots[pid]['job']
# don't overwrite resource_details from the agent
#
if 'resource_details' in pilot:
del(pilot['resource_details'])
if pilot['state'] in rps.FINAL:
continue
self._log.debug('plan cancellation of %s : %s', pilot, job)
to_advance.append(pilot)
self._log.debug('request cancel for %s', pilot['uid'])
tc.add(job)
self._log.debug('cancellation start')
tc.cancel()
tc.wait()
self._log.debug('cancellation done')
# set canceled state
self.advance(to_advance, state=rps.CANCELED, push=False, publish=True)
except Exception:
self._log.exception('pilot kill failed')
return True
# --------------------------------------------------------------------------
#
def work(self, pilots):
if not isinstance(pilots, list):
pilots = [pilots]
self.advance(pilots, rps.PMGR_LAUNCHING, publish=True, push=False)
# We can only use bulk submission for pilots which go to the same
# target, thus we sort them into buckets and lunch the buckets
# individually
buckets = dict()
for pilot in pilots:
resource = pilot['description']['resource']
schema = pilot['description']['access_schema']
if resource not in buckets:
buckets[resource] = dict()
if schema not in buckets[resource]:
buckets[resource][schema] = list()
buckets[resource][schema].append(pilot)
for resource in buckets:
for schema in buckets[resource]:
try:
pilots = buckets[resource][schema]
pids = [p['uid'] for p in pilots]
self._log.info("Launching pilots on %s: %s", resource, pids)
self._start_pilot_bulk(resource, schema, pilots)
self.advance(pilots, rps.PMGR_ACTIVE_PENDING, push=False, publish=True)
except Exception:
self._log.exception('bulk launch failed')
self.advance(pilots, rps.FAILED, push=False, publish=True)
# --------------------------------------------------------------------------
#
def _start_pilot_bulk(self, resource, schema, pilots):
"""
For each pilot, we prepare by determining what files need to be staged,
and what job description needs to be submitted.
We expect `_prepare_pilot(resource, pilot)` to return a dict with:
{
'js': saga.job.Description,
'ft': [
{ 'src': string # absolute source file name
'tgt': string # relative target file name
'rem': bool # shall we remove src?
},
... ]
}
When transfering data, we'll ensure that each src is only transferred
once (in fact, we put all src files into a tarball and unpack that on
the target side).
The returned dicts are expected to only contain files which actually
need staging, ie. which have not been staged during a previous pilot
submission. That implies one of two things: either this component is
stateful, and remembers what has been staged -- which makes it difficult
to use multiple component instances; or the component inspects the
target resource for existing files -- which involves additional
expensive remote hops.
FIXME: since neither is implemented at this point we won't discuss the
tradeoffs further -- right now files are unique per pilot bulk.
Once all dicts are collected, we create one additional file which
contains the staging information, and then pack all src files into
a tarball for staging. We transfer the tarball, and *immediately*
trigger the untaring on the target resource, which is thus *not* part of
the bootstrapping process.
NOTE: this is to avoid untaring race conditions for multiple pilots, and
also to simplify bootstrapping dependencies -- the bootstrappers
are likely within the tarball after all...
"""
rcfg = self._session.get_resource_config(resource, schema)
sid = self._session.uid
# ----------------------------------------------------------------------
# the rcfg can contain keys with string expansion placeholders where
# values from the pilot description need filling in. A prominent
# example is `%(pd.project)s`, where the pilot description's `PROJECT`
# value needs to be filled in (here in lowercase).
#
# FIXME: right now we assume all pilot descriptions to contain similar
# entries, so that the expansion is only done on the first PD.
expand = dict()
pd = pilots[0]['description']
for k,v in pd.items():
if v is None:
v = ''
expand['pd.%s' % k] = v
if isinstance(v, str):
expand['pd.%s' % k.upper()] = v.upper()
expand['pd.%s' % k.lower()] = v.lower()
else:
expand['pd.%s' % k.upper()] = v
expand['pd.%s' % k.lower()] = v
for k in rcfg:
if isinstance(rcfg[k], str):
orig = rcfg[k]
rcfg[k] = rcfg[k] % expand
expanded = rcfg[k]
if orig != expanded:
self._log.debug('RCFG:\n%s\n%s', orig, expanded)
# we create a fake session_sandbox with all pilot_sandboxes in /tmp, and
# then tar it up. Once we untar that tarball on the target machine, we
# should have all sandboxes and all files required to bootstrap the
# pilots
# FIXME: on untar, there is a race between multiple launcher components
# within the same session toward the same target resource.
tmp_dir = os.path.abspath(tempfile.mkdtemp(prefix='rp_agent_tar_dir'))
tar_name = '%s.%s.tgz' % (sid, self.uid)
tar_tgt = '%s/%s' % (tmp_dir, tar_name)
tar_url = rs.Url('file://localhost/%s' % tar_tgt)
# we need the session sandbox url, but that is (at least in principle)
# dependent on the schema to use for pilot startup. So we confirm here
# that the bulk is consistent wrt. to the schema. Also include
# `staging_input` files and place them in the `staging_area`.
#
# FIXME: if it is not, it needs to be splitted into schema-specific
# sub-bulks
#
schema = pd.get('access_schema')
for pilot in pilots[1:]:
assert(schema == pilot['description'].get('access_schema')), \
'inconsistent scheme on launch / staging'
# get and expand sandboxes
session_sandbox = self._session._get_session_sandbox(pilots[0]).path
session_sandbox = session_sandbox % expand
# we will create the session sandbox before we untar, so we can use that
# as workdir, and pack all paths relative to that session sandbox. That
# implies that we have to recheck that all URLs in fact do point into
# the session sandbox.
#
# We also create a file `staging_output.json` for each pilot which
# contains the list of files to be tarred up and prepared for output
# staging
ft_list = list() # files to stage
jd_list = list() # jobs to submit
for pilot in pilots:
os.makedirs('%s/%s' % (tmp_dir, pilot['uid']))
info = self._prepare_pilot(resource, rcfg, pilot, expand)
ft_list += info['ft']
jd_list.append(info['jd'])
self._prof.prof('staging_in_start', uid=pilot['uid'])
for fname in ru.as_list(pilot['description'].get('input_staging')):
ft_list.append({'src': fname,
'tgt': '%s/staging_area/%s'
% (pilot['uid'], os.path.basename(fname)),
'rem': False})
output_staging = pilot['description'].get('output_staging')
if output_staging:
fname = '%s/%s/staging_output.txt' % (tmp_dir, pilot['uid'])
with open(fname, 'w') as fout:
for entry in output_staging:
fout.write('%s\n' % entry)
for ft in ft_list:
src = os.path.abspath(ft['src'])
tgt = os.path.relpath(os.path.normpath(ft['tgt']), session_sandbox)
# src_dir = os.path.dirname(src)
tgt_dir = os.path.dirname(tgt)
if tgt_dir.startswith('..'):
# raise ValueError('staging tgt %s outside pilot sbox: %s' % (ft['tgt'], tgt))
tgt = ft['tgt']
tgt_dir = os.path.dirname(tgt)
if not os.path.isdir('%s/%s' % (tmp_dir, tgt_dir)):
os.makedirs('%s/%s' % (tmp_dir, tgt_dir))
if src == '/dev/null':
# we want an empty file -- touch it (tar will refuse to
# handle a symlink to /dev/null)
open('%s/%s' % (tmp_dir, tgt), 'a').close()
else:
# use a shell callout to account for wildcard expansion
cmd = 'ln -s %s %s/%s' % (os.path.abspath(src), tmp_dir, tgt)
out, err, ret = ru.sh_callout(cmd, shell=True)
if ret:
self._log.debug('out: %s', out)
self._log.debug('err: %s', err)
raise RuntimeError('callout failed: %s' % cmd)
# tar. If any command fails, this will raise.
cmd = "cd %s && tar zchf %s *" % (tmp_dir, tar_tgt)
out, err, ret = ru.sh_callout(cmd, shell=True)
if ret:
self._log.debug('out: %s', out)
self._log.debug('err: %s', err)
raise RuntimeError('callout failed: %s' % cmd)
# remove all files marked for removal-after-pack
for ft in ft_list:
if ft['rem']:
os.unlink(ft['src'])
fs_endpoint = rcfg['filesystem_endpoint']
fs_url = rs.Url(fs_endpoint)
key = str(fs_url)
self._log.debug("rs.file.Directory ('%s')", fs_url)
with self._cache_lock:
if key in self._saga_fs_cache:
fs = self._saga_fs_cache[key]
else:
fs = rsfs.Directory(fs_url, session=self._session)
self._saga_fs_cache[key] = fs
tar_rem = rs.Url(fs_url)
tar_rem.path = "%s/%s" % (session_sandbox, tar_name)
fs.copy(tar_url, tar_rem, flags=rsfs.CREATE_PARENTS)
shutil.rmtree(tmp_dir)
# we now need to untar on the target machine.
js_url = ru.Url(pilots[0]['js_url'])
# well, we actually don't need to talk to the rm, but only need
# a shell on the headnode. That seems true for all ResourceManager we use right
# now. So, lets convert the URL:
if '+' in js_url.scheme:
parts = js_url.scheme.split('+')
if 'gsissh' in parts: js_url.scheme = 'gsissh'
elif 'ssh' in parts: js_url.scheme = 'ssh'
else:
# In the non-combined '+' case we need to distinguish between
# a url that was the result of a hop or a local rm.
if js_url.scheme not in ['ssh', 'gsissh']:
js_url.scheme = 'fork'
js_url.host = 'localhost'
with self._cache_lock:
if js_url in self._saga_js_cache:
js_tmp = self._saga_js_cache[js_url]
else:
js_tmp = rs.job.Service(js_url, session=self._session)
self._saga_js_cache[js_url] = js_tmp
# cmd = "tar zmxvf %s/%s -C / ; rm -f %s" % \
cmd = "tar zmxvf %s/%s -C %s" % \
(session_sandbox, tar_name, session_sandbox)
j = js_tmp.run_job(cmd)
j.wait()
self._log.debug('tar cmd : %s', cmd)
self._log.debug('tar done: %s, %s, %s', j.state, j.stdout, j.stderr)
for pilot in pilots:
self._prof.prof('staging_in_stop', uid=pilot['uid'])
self._prof.prof('submission_start', uid=pilot['uid'])
# look up or create JS for actual pilot submission. This might result
# in the same js url as above, or not.
js_ep = rcfg['job_manager_endpoint']
with self._cache_lock:
if js_ep in self._saga_js_cache:
js = self._saga_js_cache[js_ep]
else:
js = rs.job.Service(js_ep, session=self._session)
self._saga_js_cache[js_ep] = js
# now that the scripts are in place and configured,
# we can launch the agent
jc = rs.job.Container()
for jd in jd_list:
self._log.debug('jd: %s', pprint.pformat(jd.as_dict()))
jc.add(js.create_job(jd))
jc.run()
# we assume here that the tasks arrive in the same order as the job
# descriptions. For uniform sets of pilots the order does not matter
# much though. Either way, this needs confirming on SAGA level
# FIXME
for j,jd in zip(jc.get_tasks(), jd_list):
# do a quick error check
if j.state == rs.FAILED:
self._log.error('%s: %s : %s : %s', j.id, j.state, j.stderr, j.stdout)
raise RuntimeError("SAGA Job state is FAILED. (%s)" % jd.name)
pilot = None
for p in pilots:
# we do not force unique job_names and multiple pilots may have
# the same job_name. By checking if p['uid'] is in PMGR pilots
# we ensure that each pilot is checked only once.
if p['uid'] in self._pilots:
continue
# SAGA job name is equal to a pilot's job_name if it exists
# otherwise the pilot's uid. Pick job_name if it exists
if p['description']['job_name']:
p_name = p['description']['job_name']
else:
p_name = p['uid']
if p_name == jd.name:
pilot = p
break
assert(pilot)
pid = pilot['uid']
# Update the Pilot's state to 'PMGR_ACTIVE_PENDING' if SAGA job
# submission was successful. Since the pilot leaves the scope of
# the PMGR for the time being, we update the complete DB document
pilot['$all'] = True
# FIXME: update the right pilot
with self._pilots_lock:
self._pilots[pid] = dict()
self._pilots[pid]['pilot'] = pilot
self._pilots[pid]['job'] = j
# make sure we watch that pilot
with self._check_lock:
self._checking.append(pid)
for pilot in pilots:
self._prof.prof('submission_stop', uid=pilot['uid'])
# --------------------------------------------------------------------------
#
def _prepare_pilot(self, resource, rcfg, pilot, expand):
pid = pilot["uid"]
ret = {'ft': list(),
'jd': None }
# ----------------------------------------------------------------------
# Database connection parameters
sid = self._session.uid
database_url = self._session.cfg.dburl
# some default values are determined at runtime
default_virtenv = '%%(resource_sandbox)s/ve.%s.%s' % \
(resource, self._rp_version)
# ----------------------------------------------------------------------
# pilot description and resource configuration
number_cores = pilot['description']['cores']
number_gpus = pilot['description']['gpus']
required_memory = pilot['description']['memory']
runtime = pilot['description']['runtime']
app_comm = pilot['description']['app_comm']
queue = pilot['description']['queue']
job_name = pilot['description']['job_name']
project = pilot['description']['project']
cleanup = pilot['description']['cleanup']
candidate_hosts = pilot['description']['candidate_hosts']
# ----------------------------------------------------------------------
# get parameters from resource cfg, set defaults where needed
agent_launch_method = rcfg.get('agent_launch_method')
agent_dburl = rcfg.get('agent_mongodb_endpoint', database_url)
agent_spawner = rcfg.get('agent_spawner', DEFAULT_AGENT_SPAWNER)
rc_agent_config = rcfg.get('agent_config', DEFAULT_AGENT_CONFIG)
agent_scheduler = rcfg.get('agent_scheduler')
tunnel_bind_device = rcfg.get('tunnel_bind_device')
default_queue = rcfg.get('default_queue')
forward_tunnel_endpoint = rcfg.get('forward_tunnel_endpoint')
resource_manager = rcfg.get('resource_manager')
mpi_launch_method = rcfg.get('mpi_launch_method', '')
pre_bootstrap_0 = rcfg.get('pre_bootstrap_0', [])
pre_bootstrap_1 = rcfg.get('pre_bootstrap_1', [])
python_interpreter = rcfg.get('python_interpreter')
task_launch_method = rcfg.get('task_launch_method')
rp_version = rcfg.get('rp_version')
virtenv_mode = rcfg.get('virtenv_mode', DEFAULT_VIRTENV_MODE)
virtenv = rcfg.get('virtenv', default_virtenv)
cores_per_node = rcfg.get('cores_per_node', 0)
gpus_per_node = rcfg.get('gpus_per_node', 0)
lfs_path_per_node = rcfg.get('lfs_path_per_node', None)
lfs_size_per_node = rcfg.get('lfs_size_per_node', 0)
python_dist = rcfg.get('python_dist')
virtenv_dist = rcfg.get('virtenv_dist', DEFAULT_VIRTENV_DIST)
cu_tmp = rcfg.get('cu_tmp')
spmd_variation = rcfg.get('spmd_variation')
shared_filesystem = rcfg.get('shared_filesystem', True)
stage_cacerts = rcfg.get('stage_cacerts', False)
cu_pre_exec = rcfg.get('cu_pre_exec')
cu_post_exec = rcfg.get('cu_post_exec')
export_to_cu = rcfg.get('export_to_cu')
mandatory_args = rcfg.get('mandatory_args', [])
system_architecture = rcfg.get('system_architecture', {})
saga_jd_supplement = rcfg.get('saga_jd_supplement', {})
self._log.debug(cores_per_node)
self._log.debug(pprint.pformat(rcfg))
# make sure that mandatory args are known
for ma in mandatory_args:
if pilot['description'].get(ma) is None:
raise ValueError('attribute "%s" is required for "%s"'
% (ma, resource))
# get pilot and global sandbox
resource_sandbox = self._session._get_resource_sandbox(pilot)
session_sandbox = self._session._get_session_sandbox (pilot)
pilot_sandbox = self._session._get_pilot_sandbox (pilot)
client_sandbox = self._session._get_client_sandbox ()
pilot['resource_sandbox'] = str(resource_sandbox) % expand
pilot['session_sandbox'] = str(session_sandbox) % expand
pilot['pilot_sandbox'] = str(pilot_sandbox) % expand
pilot['client_sandbox'] = str(client_sandbox)
# from here on we need only paths
resource_sandbox = resource_sandbox.path % expand
session_sandbox = session_sandbox .path % expand
pilot_sandbox = pilot_sandbox .path % expand
# client_sandbox = client_sandbox # not expanded
# Agent configuration that is not part of the public API.
# The agent config can either be a config dict, or
# a string pointing to a configuration name. If neither
# is given, check if 'RADICAL_PILOT_AGENT_CONFIG' is
# set. The last fallback is 'agent_default'
agent_config = pilot['description'].get('_config')
if not agent_config:
agent_config = os.environ.get('RADICAL_PILOT_AGENT_CONFIG')
if not agent_config:
agent_config = rc_agent_config
if not job_name:
job_name = pid
if isinstance(agent_config, dict):
# use dict as is
agent_cfg = agent_config
elif isinstance(agent_config, str):
agent_cfg = ru.Config('radical.pilot',
category='agent',
name=agent_config)
else:
# we can't handle this type
raise TypeError('agent config must be string (config name) or dict')
# expand variables in virtenv string
virtenv = virtenv % {'pilot_sandbox' : pilot_sandbox,
'session_sandbox' : session_sandbox,
'resource_sandbox': resource_sandbox}