Skip to content

Commit

Permalink
Merge pull request DIRACGrid#7105 from martynia/integration_janusz_pi…
Browse files Browse the repository at this point in the history
…lotlogsWrapper_get_dev

[Integration] Modify dirac-admin-get-pilot-output to get remote pilot log
  • Loading branch information
fstagni authored Sep 19, 2023
2 parents c728ad4 + bad7e9e commit efa94db
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 9 deletions.
7 changes: 4 additions & 3 deletions src/DIRAC/Interfaces/API/DiracAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def getJobPilotOutput(self, jobID, directory=""):
:param job: JobID
:type job: integer or string
:param str directory: a directory to download logs to.
:return: S_OK,S_ERROR
"""
if not directory:
Expand Down Expand Up @@ -468,13 +469,13 @@ def getJobPilotOutput(self, jobID, directory=""):

#############################################################################
def getPilotOutput(self, gridReference, directory=""):
"""Retrieve the pilot output (std.out and std.err) for an existing job in the WMS.
"""Retrieve the pilot output (std.out and std.err) for an existing pilot reference.
>>> gLogger.notice(dirac.getJobPilotOutput(12345))
{'OK': True, 'Value': {}}
:param job: JobID
:type job: integer or string
:param str gridReference: pilot reference
:param str directory: a directory to download logs to.
:return: S_OK,S_ERROR
"""
if not isinstance(gridReference, str):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Pilot logging plugin abstract class.
"""
from abc import ABC, abstractmethod
from DIRAC import S_OK, S_ERROR, gLogger

sLog = gLogger.getSubLogger(__name__)


class DownloadPlugin(ABC):
"""
Remote pilot log retriever base abstract class. It defines abstract methods used to download log files from a remote
storage to the server.
Any pilot logger download plugin should inherit from this class and implement a (sub)set of methods required by
:class:`PilotManagerHandler`.
"""

@abstractmethod
def getRemotePilotLogs(self, pilotStamp, vo):
"""
Pilot log getter method, carrying the unique pilot identity and a VO name.
:param str pilotStamp: pilot stamp.
:param str vo: VO name of a pilot which generated the logs.
:return: S_OK or S_ERROR
:rtype: dict
"""

pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
File cache pilot log downloader.
"""
import os
import tempfile
from DIRAC import S_OK, S_ERROR, gLogger, gConfig
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.DownloadPlugin import DownloadPlugin
from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient

sLog = gLogger.getSubLogger(__name__)


class FileCacheDownloadPlugin(DownloadPlugin):
"""
Class to handle log file download from an SE
"""

def __init__(self):
"""
Sets the client for downloading incomplete log files from the server cache.
"""
self.tornadoClient = TornadoPilotLoggingClient()

def getRemotePilotLogs(self, pilotStamp, vo=None):
"""
Pilot log getter method, carrying the unique pilot identity and a VO name.
:param str pilotStamp: pilot stamp.
:param str vo: VO name of a user/pilot which generated the logs.
:return: S_OK or S_ERROR
:rtype: dict
"""

opsHelper = Operations(vo=vo)
uploadPath = opsHelper.getValue("Pilot/UploadPath", "")
lfn = os.path.join(uploadPath, pilotStamp + ".log")
sLog.info("LFN to download: ", lfn)

# get pilot credentials which uploaded logs to an external storage:
res = opsHelper.getOptionsDict("Shifter/DataManager")
if not res["OK"]:
message = f"No shifter defined for VO: {vo} - needed to retrieve the logs !"
sLog.error(message)
return S_ERROR(message)

proxyUser = res["Value"].get("User")
proxyGroup = res["Value"].get("Group")

sLog.info(f"Proxy used for retrieving pilot logs: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")

# attempt to get logs from server first:
res = self._getLogsFromServer(pilotStamp, vo)
if not res["OK"]:
# from SE:
res = self._downloadLogs( # pylint: disable=unexpected-keyword-arg
lfn, pilotStamp, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
)

return res

@executeWithUserProxy
def _downloadLogs(self, lfn, pilotStamp):
filepath = tempfile.TemporaryDirectory().name
os.makedirs(filepath, exist_ok=True)

res = DataManager().getFile(lfn, destinationDir=filepath)
sLog.debug("getFile result:", res)
if not res["OK"]:
sLog.error(f"Failed to contact storage")
return res
if lfn in res["Value"]["Failed"]:
sLog.error("Failed to retrieve a log file:", res["Value"]["Failed"])
return S_ERROR(f"Failed to retrieve a log file: {res['Value']['Failed']}")
try:
filename = os.path.join(filepath, pilotStamp + ".log")
with open(filename) as f:
stdout = f.read()
except FileNotFoundError as err:
sLog.error(f"Error opening a log file:{filename}", err)
return S_ERROR(repr(err))

resultDict = {}
resultDict["StdOut"] = stdout
return S_OK(resultDict)

@executeWithUserProxy
def _getLogsFromServer(self, logfile, vo):
"""
Get a file from the server cache area. The file is most likely not finalised, since finalised files
are copied to an SE and deleted. Both logfile.log and logfile are tried should the finalised file still
be on the server.
:param str logfile: pilot log filename
:param str vo: VO name
:return: S_OK or S_ERROR
:rtype: dict
"""

res = self.tornadoClient.getLogs(logfile, vo)
if not res["OK"]:
res = self.tornadoClient.getLogs(logfile + ".log", vo)
return res
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ def getMeta(self):
return S_OK(self.meta)
return S_ERROR("No Pilot logging directory defined")

def getLogs(self, logfile, vo):
"""
Get the "instant" logs from Tornado log storage area. There are not finalised (incomplete) logs.
:return: Dirac S_OK containing the logs
:rtype: dict
"""

filename = os.path.join(self.meta["LogPath"], vo, logfile)
resultDict = {}
try:
with open(filename) as f:
stdout = f.read()
resultDict["StdOut"] = stdout
except FileNotFoundError as err:
sLog.error(f"Error opening a log file:{filename}", err)
return S_ERROR(repr(err))

return S_OK(resultDict)

def _verifyUUIDPattern(self, logfile):
"""
Verify if the name of the log file matches the required pattern.
Expand Down
74 changes: 69 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

from DIRAC import S_OK, S_ERROR
import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities

from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
from DIRAC.Core.DISET.RequestHandler import getServiceOption
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import (
getPilotCE,
Expand All @@ -35,6 +35,10 @@ def initializeHandler(cls, serviceInfoDict):
except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp}")

# prepare remote pilot plugin initialization
defaultOption, defaultClass = "DownloadPlugin", "FileCacheDownloadPlugin"
cls.configValue = getServiceOption(serviceInfoDict, defaultOption, defaultClass)
cls.loggingPlugin = None
return S_OK()

##############################################################################
Expand Down Expand Up @@ -92,9 +96,16 @@ def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC
types_getPilotOutput = [str]

def export_getPilotOutput(self, pilotReference):
"""Get the pilot job standard output and standard error files for the Grid
job reference
"""
Get the pilot job standard output and standard error files for a pilot reference.
Handles both classic, CE-based logs and remote logs. The type of logs returned is determined
by the server.
:param str pilotReference:
:return: S_OK or S_ERROR Dirac object
:rtype: dict
"""

result = self.pilotAgentsDB.getPilotInfo(pilotReference)
if not result["OK"]:
self.log.error("Failed to get info for pilot", result["Message"])
Expand All @@ -104,6 +115,26 @@ def export_getPilotOutput(self, pilotReference):
return S_ERROR("Pilot info is empty")

pilotDict = result["Value"][pilotReference]
vo = getVOForGroup(pilotDict["OwnerGroup"])
opsHelper = Operations(vo=vo)
remote = opsHelper.getValue("Pilot/RemoteLogsPriority", False)
# classic logs first, by default
funcs = [self._getPilotOutput, self._getRemotePilotOutput]
if remote:
funcs.reverse()

result = funcs[0](pilotReference, pilotDict)
if not result["OK"]:
self.log.warn("Pilot log retrieval failed (first attempt), remote ?", remote)
result = funcs[1](pilotReference, pilotDict)
return result
else:
return result

def _getPilotOutput(self, pilotReference, pilotDict):
"""Get the pilot job standard output and standard error files for the Grid
job reference
"""

group = pilotDict["OwnerGroup"]

Expand Down Expand Up @@ -158,6 +189,39 @@ def export_getPilotOutput(self, pilotReference):
shutil.rmtree(ce.ceParameters["WorkingDirectory"])
return S_OK(resultDict)

def _getRemotePilotOutput(self, pilotReference, pilotDict):
"""
Get remote pilot log files.
:param str pilotReference:
:return: S_OK Dirac object
:rtype: dict
"""

pilotStamp = pilotDict["PilotStamp"]
group = pilotDict["OwnerGroup"]
vo = getVOForGroup(group)

if self.loggingPlugin is None:
result = ObjectLoader().loadObject(
f"WorkloadManagementSystem.Client.PilotLoggingPlugins.{self.configValue}", self.configValue
)
if not result["OK"]:
self.log.error("Failed to load LoggingPlugin", f"{self.configValue}: {result['Message']}")
return result

componentClass = result["Value"]
self.loggingPlugin = componentClass()
self.log.info("Loaded: PilotLoggingPlugin class", self.configValue)

res = self.loggingPlugin.getRemotePilotLogs(pilotStamp, vo)

if res["OK"]:
res["Value"]["OwnerGroup"] = group
res["Value"]["FileList"] = []
# return res, correct or not
return res

##############################################################################
types_getPilotInfo = [[list, str]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ def export_getMetadata(self):
"""
return self.loggingPlugin.getMeta()

def export_getLogs(self, logfile, vo):
"""
Get (not yet finalised) logs from the server.
:return: S_OK containing a metadata dictionary
:rtype: dict
"""

return self.loggingPlugin.getLogs(logfile, vo)

def export_finaliseLogs(self, payload, pilotUUID):
"""
Finalise a log file. Finalised logfile can be copied to a secure location, if a file cache is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def export_getJobPilotOutput(self, jobID):
job reference
:param str jobID: job ID
:return: S_OK(dict)/S_ERROR()
"""
pilotReference = ""
Expand Down

0 comments on commit efa94db

Please sign in to comment.