Skip to content

Commit

Permalink
Merge pull request #363 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Oct 31, 2024
2 parents 684958c + 9bd6c4d commit 8b9558b
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 43 deletions.
5 changes: 5 additions & 0 deletions main/lib/idds/agents/carrier/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ def get_task_params(self, work):

task_param_map['reqID'] = work.request_id

if work.container_options:
if type(work.container_options) in [dict] and work.container_options.get('container_image', None):
container_image = work.container_options.get('container_image', None)
task_param_map['container_name'] = container_image

return task_param_map

def submit(self, *args, **kwargs):
Expand Down
10 changes: 8 additions & 2 deletions main/lib/idds/agents/conductor/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2023
# - Wen Guan, <[email protected]>, 2019 - 2024

import datetime
import random
Expand Down Expand Up @@ -120,12 +120,15 @@ def get_new_messages(self):
destination = [MessageDestination.Outside, MessageDestination.ContentExt, MessageDestination.AsyncResult]
messages = core_messages.retrieve_messages(status=MessageStatus.New,
min_request_id=BaseAgent.min_request_id,
delay=60,
record_fetched=True,
bulk_size=self.retrieve_bulk_size,
destination=destination)

# self.logger.debug("Main thread get %s new messages" % len(messages))
if messages:
self.logger.info("Main thread get %s new messages" % len(messages))

return messages

def get_retry_messages(self):
Expand All @@ -140,9 +143,12 @@ def get_retry_messages(self):

retry_messages = []
destination = [MessageDestination.Outside, MessageDestination.ContentExt, MessageDestination.AsyncResult]
messages_d = core_messages.retrieve_messages(status=MessageStatus.Delivered,
messages_d = core_messages.retrieve_messages(status=[MessageStatus.Delivered, MessageStatus.Fetched],
min_request_id=BaseAgent.min_request_id,
use_poll_period=True,
delay=120,
record_fetched=True,
record_fetched_status=MessageStatus.Delivered,
bulk_size=self.retrieve_bulk_size,
destination=destination) # msg_type=msg_type)
if messages_d:
Expand Down
34 changes: 24 additions & 10 deletions main/lib/idds/core/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
operations related to Messages.
"""

import datetime
import threading

from idds.common.constants import MessageDestination, MessageType, MessageStatus
Expand Down Expand Up @@ -46,11 +47,13 @@ def add_messages(messages, bulk_size=1000, session=None):
return orm_messages.add_messages(messages, bulk_size=bulk_size, session=session)


@read_session
@transactional_session
def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=None,
source=None, request_id=None, workload_id=None, transform_id=None,
processing_id=None, use_poll_period=False, retries=None, delay=None,
min_request_id=None, fetching_id=None, internal_id=None, session=None):
processing_id=None, use_poll_period=False, retries=None, delay=60,
min_request_id=None, fetching_id=None, internal_id=None,
record_fetched=False, record_fetched_status=MessageStatus.Fetched,
session=None):
"""
Retrieve up to $bulk messages.
Expand All @@ -66,13 +69,24 @@ def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=No
hb_thread = threading.current_thread()
fetching_id = hb_thread.ident

return orm_messages.retrieve_messages(bulk_size=bulk_size, msg_type=msg_type,
status=status, source=source, destination=destination,
request_id=request_id, workload_id=workload_id,
transform_id=transform_id, processing_id=processing_id,
retries=retries, delay=delay, fetching_id=fetching_id,
min_request_id=min_request_id, internal_id=internal_id,
use_poll_period=use_poll_period, session=session)
messages = orm_messages.retrieve_messages(bulk_size=bulk_size, msg_type=msg_type,
status=status, source=source, destination=destination,
request_id=request_id, workload_id=workload_id,
transform_id=transform_id, processing_id=processing_id,
retries=retries, delay=delay, fetching_id=fetching_id,
min_request_id=min_request_id, internal_id=internal_id,
use_poll_period=use_poll_period, session=session)
if record_fetched:
to_updates = []
for msg in messages:
to_update = {'msg_id': msg['msg_id'],
'request_id': msg['request_id'],
'poll_period': datetime.timedelta(seconds=delay),
'status': record_fetched_status}
to_updates.append(to_update)
if to_updates:
orm_messages.update_messages(to_updates, min_request_id=min_request_id, session=session)
return messages


@read_session
Expand Down
11 changes: 8 additions & 3 deletions main/lib/idds/orm/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from idds.common.constants import MessageDestination
from idds.common.utils import group_list
from idds.orm.base import models
from idds.orm.base.session import read_session, transactional_session
from idds.orm.base.session import transactional_session


@transactional_session
Expand Down Expand Up @@ -133,7 +133,7 @@ def update_messages(messages, bulk_size=1000, use_bulk_update_mappings=False, re
raise exceptions.DatabaseException('Could not persist message: %s' % str(e))


@read_session
@transactional_session
def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None,
destination=None, request_id=None, workload_id=None,
transform_id=None, processing_id=None, fetching_id=None,
Expand Down Expand Up @@ -162,13 +162,18 @@ def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None,
msg_type = [msg_type]
if len(msg_type) == 1:
msg_type = [msg_type[0], msg_type[0]]
if status is not None:
if not isinstance(status, (list, tuple)):
status = [status]
if len(status) == 1:
status = [status[0], status[0]]

query = session.query(models.Message)

if msg_type is not None:
query = query.filter(models.Message.msg_type.in_(msg_type))
if status is not None:
query = query.filter_by(status=status)
query = query.filter(models.Message.status.in_(status))
if source is not None:
query = query.filter_by(source=source)
if destination is not None:
Expand Down
1 change: 1 addition & 0 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
task_ids = [16700, 16704, 17055, 17646, 17792, 18509, 19754, 21666, 21714, 21739, 16148, 16149, 16150]
task_ids = [473, 472] + [i for i in range(325, 345)]
task_ids = [476, 477, 478]
task_ids = [937, 938, 940, 941]
for task_id in task_ids:
print("Killing %s" % task_id)
ret = Client.killTask(task_id, verbose=True)
Expand Down
12 changes: 6 additions & 6 deletions monitor/data/conf.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

var appConfig = {
'iddsAPI_request': "https://lxplus972.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus972.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus972.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus972.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus972.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus972.cern.ch:443/idds/monitor/null/null/false/false/true"
'iddsAPI_request': "https://lxplus925.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus925.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus925.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/false/false/true"
}
10 changes: 5 additions & 5 deletions workflow/lib/idds/iworkflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,20 @@ def run_func(self, func, pre_kwargs, args, kwargs):
"""
try:
logging.info(f"func type: {type(func)}: {str(func)}")
logging.info("pre_kwargs type: {type(pre_kwargs)}: {str(pre_kwargs)}")
logging.info("args type: {type(args)}: {str(args)}")
logging.info("kwargs type: {type(kwargs)}: {str(kwargs)}")
logging.info(f"pre_kwargs type: {type(pre_kwargs)}: {str(pre_kwargs)}")
logging.info(f"args type: {type(args)}: {str(args)}")
logging.info(f"kwargs type: {type(kwargs)}: {str(kwargs)}")
kwargs_copy = copy.deepcopy(pre_kwargs)
kwargs_copy.update(kwargs)
logging.info("start to run function: {str(func)}")
logging.info(f"start to run function: {str(func)}")
if kwargs_copy:
ret = func(*args, **kwargs_copy)
else:
ret = func(*args)
logging.info(f"Successfully run function, ret: {ret}")
return True, ret, None
except Exception as ex:
logging.error("Failed to run the function: {str(ex)}")
logging.error(f"Failed to run the function: {str(ex)}")
logging.error(traceback.format_exc())
return False, None, str(ex)

Expand Down
42 changes: 32 additions & 10 deletions workflow/lib/idds/iworkflow/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

class WorkContext(Context):

def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=None):
def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=None, container_options=None):
super(WorkContext, self).__init__()
self._workflow_context = workflow_context
self._transform_id = None
Expand All @@ -54,6 +54,7 @@ def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=N
self._map_results = False

self.init_env = init_env
self.container_options = container_options

def get_service(self):
return self._workflow_context.service
Expand Down Expand Up @@ -315,6 +316,16 @@ def init_env(self, value):
if self._init_env:
self._init_env = self._init_env + " "

@property
def container_options(self):
if self._container_options:
return self._container_options
return self._workflow_context.container_options

@container_options.setter
def container_options(self, value):
self._container_options = value

def get_idds_server(self):
return self._workflow_context.get_idds_server()

Expand Down Expand Up @@ -356,7 +367,8 @@ def get_clean_env(self):
class Work(Base):

def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=None, args=None, kwargs=None, multi_jobs_kwargs_list=None,
current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False, name=None):
current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False, name=None,
container_options=None):
"""
Init a workflow.
"""
Expand Down Expand Up @@ -385,7 +397,7 @@ def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=No
if context:
self._context = context
else:
self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env)
self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env, container_options=container_options)

self._async_ret = None

Expand Down Expand Up @@ -590,6 +602,14 @@ def enable_separate_log(self):
def enable_separate_log(self, value):
self._context.enable_separate_log = value

@property
def container_options(self):
return self._context.container_options

@container_options.setter
def container_options(self, value):
self._context.container_options = value

@property
def token(self):
return self._context.token
Expand Down Expand Up @@ -1017,7 +1037,6 @@ def run_local(self):
self._func = func

if self._context.distributed:
rets = None
args_copy = copy.deepcopy(args)
pre_kwargs_copy = copy.deepcopy(pre_kwargs)
kwargs_copy = copy.deepcopy(kwargs)
Expand All @@ -1030,15 +1049,16 @@ def run_local(self):

request_id = self._context.request_id
transform_id = self._context.transform_id
logging.info("publishing AsyncResult to (request_id: %s, transform_id: %s): %s" % (request_id, transform_id, rets))
ret_log = f"(status: {ret_status}, return: {ret_output}, error: {ret_err})"
logging.info(f"publishing AsyncResult to (request_id: {request_id}, transform_id: {transform_id}): {ret_log}")
async_ret = AsyncResult(self._context, name=self.get_func_name(), internal_id=self.internal_id, current_job_kwargs=current_job_kwargs)
async_ret.publish(ret_output, ret_status=ret_status, ret_error=ret_err)

if not self.map_results:
self._results = rets
self._results = ret_output
else:
self._results = MapResult()
self._results.add_result(name=self.get_func_name(), args=current_job_kwargs, result=rets)
self._results.add_result(name=self.get_func_name(), args=current_job_kwargs, result=ret_output)
return self._results
else:
if not multi_jobs_kwargs_list:
Expand Down Expand Up @@ -1143,10 +1163,11 @@ def run_work_distributed(w):


# foo = work(arg)(foo)
def work(func=None, *, workflow=None, pre_kwargs={}, name=None, return_work=False, map_results=False, lazy=False, init_env=None, no_wraps=False):
def work(func=None, *, workflow=None, pre_kwargs={}, name=None, return_work=False, map_results=False, lazy=False, init_env=None, no_wraps=False,
container_options=None):
if func is None:
return functools.partial(work, workflow=workflow, pre_kwargs=pre_kwargs, return_work=return_work, no_wraps=no_wraps,
name=name, map_results=map_results, lazy=lazy, init_env=init_env)
name=name, map_results=map_results, lazy=lazy, init_env=init_env, container_options=container_options)

if 'IDDS_IGNORE_WORK_DECORATOR' in os.environ:
return func
Expand All @@ -1163,7 +1184,8 @@ def wrapper(*args, **kwargs):
if workflow_context:
logging.debug("setup work")
w = Work(workflow_context=workflow_context, func=func, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs,
name=name, multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=map_results, init_env=init_env)
name=name, multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=map_results, init_env=init_env,
container_options=container_options)
# if distributed:

if return_work:
Expand Down
Loading

0 comments on commit 8b9558b

Please sign in to comment.