Skip to content

Commit

Permalink
Merge pull request #3097 from radical-cybertools/fix/raptor_worker_re…
Browse files Browse the repository at this point in the history
…gistration

Fix/raptor worker registration
  • Loading branch information
mtitov authored Jan 24, 2024
2 parents d7be4d9 + e8acf97 commit a912d22
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
18 changes: 8 additions & 10 deletions src/radical/pilot/raptor/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ def __init__(self, cfg: ru.Config = None):
self._rsbox = os.environ['RP_RESOURCE_SANDBOX']
self._reg_addr = os.environ['RP_REGISTRY_ADDRESS']

self._reg = ru.zmq.RegistryClient(url=self._reg_addr)
self._reg.dump(self._uid)

# get hb configs
self._hb_freq = self._reg['rcfg.raptor.hb_frequency']
self._hb_tout = self._reg['rcfg.raptor.hb_timeout']

self._workers = dict() # wid: worker
self._tasks = dict() # bookkeeping of submitted requests
self._exec_tasks = list() # keep track of executable tasks
Expand All @@ -92,6 +85,10 @@ def __init__(self, cfg: ru.Config = None):

super().__init__(ccfg, self._session)

# get hb configs (RegistryClient instance is initiated in Session)
self._hb_freq = self._session.rcfg.raptor.hb_frequency
self._hb_tout = self._session.rcfg.raptor.hb_timeout

self._log.debug('hb freq: %s', self._hb_freq)
self._log.debug('hb tout: %s', self._hb_tout)

Expand Down Expand Up @@ -401,10 +398,9 @@ def submit_workers(self, descriptions: List[TaskDescription]
# ensure that defaults and backward compatibility kick in
td.verify()

# the default worker needs it's own task description to derive the
# the default worker needs its own task description to derive the
# amount of available resources
self._reg['raptor.%s.cfg' % self._uid] = td.as_dict()
# self._reg.dump('raptor_master')
self._reg['raptor.%s.cfg' % td.uid] = td.as_dict()

# all workers run in the same sandbox as the master
task = dict()
Expand Down Expand Up @@ -441,6 +437,8 @@ def submit_workers(self, descriptions: List[TaskDescription]

self.advance(tasks, publish=True, push=True)

# dump registry with all worker descriptions ("raptor.<worker_uid>.cfg")
self._reg.dump(self._uid)
return [task['uid'] for task in tasks]


Expand Down
39 changes: 20 additions & 19 deletions src/radical/pilot/raptor/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,19 @@ def __init__(self, manager, rank, raptor_id):
self._ranks = int(os.environ['RP_RANKS'])

self._reg = ru.zmq.RegistryClient(url=self._reg_addr)
self._reg.dump(self._uid)
self._cfg = ru.Config(cfg=self._reg['cfg'])

self._hb_delay = self._reg['rcfg.raptor.hb_delay']

self._cfg = ru.Config(cfg=self._reg['cfg'])

self._log = ru.Logger(name=self._uid, ns='radical.pilot.worker',
self._log = ru.Logger(name=self._uid,
ns='radical.pilot.worker',
level=self._cfg.log_lvl,
debug=self._cfg.debug_lvl,
targets=self._cfg.log_tgt,
path=self._cfg.path)
self._prof = ru.Profiler(name='%s.%04d' % (self._uid, self._rank),
ns='radical.pilot.worker',
path=self._cfg.path)
path=self._sbox)

# register for lifetime management messages on the control pubsub
psbox = os.environ['RP_PILOT_SANDBOX']
Expand All @@ -75,6 +74,7 @@ def __init__(self, manager, rank, raptor_id):
# let ZMQ settle
time.sleep(1)

self._hb_register_count = 60
# run heartbeat thread in all ranks (one hb msg every `n` seconds)
self._log.debug('hb delay: %s', self._hb_delay)
self._hb_thread = mt.Thread(target=self._hb_worker)
Expand Down Expand Up @@ -109,28 +109,29 @@ def __init__(self, manager, rank, raptor_id):

# the manager (rank 0) registers the worker with the master
if self._manager:

self._log.debug('register: %s / %s', self._uid, self._raptor_id)
self._ctrl_pub.put(rpc.CONTROL_PUBSUB, reg_msg)

# # FIXME: we never unregister on termination
# self._ctrl_pub.put(rpc.CONTROL_PUBSUB, {'cmd': 'worker_unregister',
# 'arg': {'uid' : self._uid}})

# wait for raptor response
self._log.debug('wait for registration to complete')
count = 0
while not self._reg_event.wait(timeout=1):
if count < 60:
count += 1
self._log.debug('re-register: %s / %s', self._uid, self._raptor_id)
self._ctrl_pub.put(rpc.CONTROL_PUBSUB, reg_msg)
else:
self.stop()
self.join()
self._log.error('registration with master timed out')
raise RuntimeError('registration with master timed out')
# wait for raptor response
self._log.debug('wait for registration to complete')
count = 0
while not self._reg_event.wait(timeout=5):
if count < self._hb_register_count:
count += 1
self._log.debug('re-register: %s / %s', self._uid, self._raptor_id)
self._ctrl_pub.put(rpc.CONTROL_PUBSUB, reg_msg)
else:
self.stop()
self.join()
self._log.error('registration with master timed out')
raise RuntimeError('registration with master timed out')

self._log.debug('registration with master ok')
self._log.debug('registration with master ok')


# --------------------------------------------------------------------------
Expand Down
9 changes: 5 additions & 4 deletions src/radical/pilot/raptor/worker_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ def __init__(self, raptor_id : str):
cb=self._request_cb)

# the master should have stored our own task description in the registry
self._reg.dump('raptor_worker')
self._descr = self._reg['raptor.%s.cfg' % self._uid]
self._descr = self._reg['raptor.%s.cfg' % self._uid] or {}

# keep worker ID and rank
self._n_cores = int(os.environ.get('cores_per_rank', 1))
self._n_gpus = int(os.environ.get('gpus_per_rank', 0))
self._n_cores = int(self._descr.get('cores_per_rank') or
os.getenv('RP_CORES_PER_RANK', 1))
self._n_gpus = int(self._descr.get('gpus_per_rank') or
os.getenv('RP_GPUS_PER_RANK', 0))

# We need to make sure to run only up to `gpn` tasks using a gpu
# within that pool, so need a separate counter for that.
Expand Down

0 comments on commit a912d22

Please sign in to comment.