Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[devel] feat: add log flushing with a timer. #217

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions Pilot/dirac-pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,18 @@
log.info("Requested command extensions: %s" % str(pilotParams.commandExtensions))

log.info("Executing commands: %s" % str(pilotParams.commands))
if pilotParams.pilotLogging:
log.buffer.flush()

if pilotParams.pilotLogging:
# It's safer to cancel the timer here. Each command has got its own logger object with a timer cancelled by the
# finaliser. No need for a timer in the "else" code segment below.
try:
log.buffer.cancelTimer()
log.debug("Timer canceled")
log.buffer.flush()
except Exception as exc:
log.error(str(exc))
for commandName in pilotParams.commands:
command, module = getCommand(pilotParams, commandName, log)
command, module = getCommand(pilotParams, commandName)
if command is not None:
command.log.info("Command %s instantiated from %s" % (commandName, module))
command.execute()
Expand Down
17 changes: 14 additions & 3 deletions Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, pilotParams):
import stat
import sys
import time
import traceback
from collections import Counter

############################
Expand Down Expand Up @@ -82,19 +83,29 @@ def wrapper(self):
return func(self)

try:
return func(self)
ret = func(self)
self.log.buffer.flush()
return ret

except SystemExit as exCode: # or Exception ?
# controlled exit
pRef = self.pp.pilotReference
self.log.info(
"Flushing the remote logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode))
)
self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit() and return).
self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit()).
try:
sendMessage(self.log.url, self.log.pilotUUID, "finaliseLogs", {"retCode": str(exCode)})
except Exception as exc:
self.log.error("Remote logger couldn't be finalised %s " % str(exc))
raise

except Exception as exc:
# unexpected exit: document it and bail out.
self.log.error(str(exc))
self.log.error(traceback.format_exc())
raise
finally:
self.log.buffer.cancelTimer()
return wrapper


Expand Down
91 changes: 70 additions & 21 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import threading
from datetime import datetime
from distutils.version import LooseVersion
from functools import partial
from functools import partial, wraps
from threading import RLock

############################
# python 2 -> 3 "hacks"
Expand Down Expand Up @@ -52,6 +53,12 @@
except NameError:
FileNotFoundError = OSError

# Timer 2.7 issue where Timer is a function
if sys.version_info.major == 2:
from threading import _Timer as Timer # pylint: disable=no-name-in-module
else:
from threading import Timer

# Utilities functions


Expand Down Expand Up @@ -215,7 +222,7 @@ def listdir(directory):

def getFlavour(ceName):

pilotReference = os.environ.get("DIRAC_PILOT_STAMP", '')
pilotReference = os.environ.get("DIRAC_PILOT_STAMP", "")
flavour = "DIRAC"

# # Batch systems
Expand Down Expand Up @@ -270,12 +277,7 @@ def getFlavour(ceName):
if "SSHBATCH_JOBID" in os.environ and "SSH_NODE_HOST" in os.environ:
flavour = "SSHBATCH"
pilotReference = (
"sshbatchhost://"
+ ceName
+ "/"
+ os.environ["SSH_NODE_HOST"]
+ "/"
+ os.environ["SSHBATCH_JOBID"]
"sshbatchhost://" + ceName + "/" + os.environ["SSH_NODE_HOST"] + "/" + os.environ["SSHBATCH_JOBID"]
)

# ARC
Expand Down Expand Up @@ -359,7 +361,7 @@ def loadObject(self, package, moduleName, command):
return None, None


def getCommand(params, commandName, log):
def getCommand(params, commandName):
"""Get an instantiated command object for execution.
Commands are looked in the following modules in the order:

Expand Down Expand Up @@ -453,7 +455,7 @@ def __init__(
pilotOutput="pilot.out",
isPilotLoggerOn=True,
pilotUUID="unknown",
setup="DIRAC-Certification",
flushInterval=10,
):
"""
c'tor
Expand All @@ -465,7 +467,7 @@ def __init__(
self.pilotUUID = pilotUUID
self.isPilotLoggerOn = isPilotLoggerOn
sendToURL = partial(sendMessage, url, pilotUUID, "sendMessage")
self.buffer = FixedSizeBuffer(sendToURL)
self.buffer = FixedSizeBuffer(sendToURL, autoflush=flushInterval)

def debug(self, msg, header=True, sendPilotLog=False):
super(RemoteLogger, self).debug(msg, header)
Expand Down Expand Up @@ -505,26 +507,51 @@ def sendMessage(self, msg):
super(RemoteLogger, self).error(str(err))


def synchronized(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
with self._rlock:
return func(self, *args, **kwargs)

return wrapper


class RepeatingTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)


class FixedSizeBuffer(object):
"""
A buffer with a (preferred) fixed number of lines.
Once it's full, a message is sent to a remote server and the buffer is renewed.
"""

def __init__(self, senderFunc, bufsize=10):
def __init__(self, senderFunc, bufsize=10, autoflush=10):
"""
Constructor.

:param senderFunc: a function used to send a message
:type senderFunc: func
:param bufsize: size of the buffer (in lines)
:type bufsize: int
:param autoflush: buffer flush period in seconds
:type autoflush: int
"""

self._rlock = RLock()
if autoflush > 0:
self._timer = RepeatingTimer(autoflush, self.flush)
self._timer.start()
else:
self._timer = None
self.output = StringIO()
self.bufsize = bufsize
self.__nlines = 0
self._nlines = 0
self.senderFunc = senderFunc

@synchronized
def write(self, text):
"""
Write text to a string buffer. Newline characters are counted and number of lines in the buffer
Expand All @@ -539,36 +566,49 @@ def write(self, text):
if self.output.closed:
self.output = StringIO()
self.output.write(text)
self.__nlines += max(1, text.count("\n"))
self._nlines += max(1, text.count("\n"))
self.sendFullBuffer()

@synchronized
def getValue(self):
content = self.output.getvalue()
return content

@synchronized
def sendFullBuffer(self):
"""
Get the buffer content, send a message, close the current buffer and re-create a new one for subsequent writes.

"""

if self.__nlines >= self.bufsize:
if self._nlines >= self.bufsize:
self.flush()
self.output = StringIO()

@synchronized
def flush(self):
"""
Flush the buffer and send log records to a remote server. The buffer is closed as well.

:return: None
:rtype: None
"""
if not self.output.closed:
self.output.flush()
buf = self.getValue()
self.senderFunc(buf)
self._nlines = 0
self.output.close()

def cancelTimer(self):
"""
Cancel the repeating timer if it exists.

self.output.flush()
buf = self.getValue()
self.senderFunc(buf)
self.__nlines = 0
self.output.close()
:return: None
:rtype: None
"""
if self._timer is not None:
self._timer.cancel()


def sendMessage(url, pilotUUID, method, rawMessage):
Expand Down Expand Up @@ -614,13 +654,18 @@ def __init__(self, pilotParams, dummy=""):
isPilotLoggerOn = pilotParams.pilotLogging
self.debugFlag = pilotParams.debugFlag
loggerURL = pilotParams.loggerURL
interval = pilotParams.loggerTimerInterval

if loggerURL is None:
self.log = Logger(self.__class__.__name__, debugFlag=self.debugFlag)
else:
# remote logger
self.log = RemoteLogger(
loggerURL, self.__class__.__name__, pilotUUID=pilotParams.pilotUUID, debugFlag=self.debugFlag
loggerURL,
self.__class__.__name__,
pilotUUID=pilotParams.pilotUUID,
debugFlag=self.debugFlag,
flushInterval=interval,
)

self.log.isPilotLoggerOn = isPilotLoggerOn
Expand Down Expand Up @@ -804,6 +849,7 @@ def __init__(self):
self.pilotCFGFile = "pilot.json"
self.pilotLogging = False
self.loggerURL = None
self.loggerTimerInterval = 600
self.pilotUUID = "unknown"
self.modules = "" # see dirac-install "-m" option documentation
self.userEnvVariables = "" # see dirac-install "--userEnvVariables" option documentation
Expand Down Expand Up @@ -1053,11 +1099,14 @@ def __initJSON2(self):
if pilotLogging is not None:
self.pilotLogging = pilotLogging.upper() == "TRUE"
self.loggerURL = pilotOptions.get("RemoteLoggerURL")
# logger buffer flush interval in seconds.
self.loggerTimerInterval = pilotOptions.get("RemoteLoggerTimerInterval", self.loggerTimerInterval)
pilotLogLevel = pilotOptions.get("PilotLogLevel", "INFO")
if pilotLogLevel.lower() == "debug":
self.debugFlag = True
self.log.debug("JSON: Remote logging: %s" % self.pilotLogging)
self.log.debug("JSON: Remote logging URL: %s" % self.loggerURL)
self.log.debug("JSON: Remote logging buffer flush interval in sec.(0: disabled): %s" % self.loggerTimerInterval)
self.log.debug("JSON: Remote/local logging debug flag: %s" % self.debugFlag)

# CE type if present, then Defaults, otherwise as defined in the code:
Expand Down
13 changes: 10 additions & 3 deletions Pilot/tests/Test_simplePilotLogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import tempfile

try:
from Pilot.pilotTools import CommandBase, PilotParams
from Pilot.pilotTools import CommandBase, PilotParams, RemoteLogger
except ImportError:
from pilotTools import CommandBase, PilotParams
from pilotTools import CommandBase, PilotParams, RemoteLogger

import unittest

Expand Down Expand Up @@ -144,7 +144,14 @@ def test_executeAndGetOutput(self, popenMock, argvmock):
self.stderr_mock.write("Errare humanum est!")
self.stderr_mock.seek(0)
pp = PilotParams()
cBase = CommandBase(pp)
try:
cBase = CommandBase(pp)
# we have a logger URL set, so:
assert isinstance(cBase.log, RemoteLogger)
finally:
# and cancel the timer !
cBase.log.buffer.cancelTimer()

popenMock.return_value.stdout = self.stdout_mock
popenMock.return_value.stderr = self.stderr_mock
outData = cBase.executeAndGetOutput("dummy")
Expand Down