Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] feat (TS): add getTransformationFilesAsJsonString #7843

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/source/DeveloperGuide/CodeTesting/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Testing itself could also speed up the development process rapidly tracing probl
DIRAC is not different from that scenario, with the exception that service-oriented architecture paradigm,
which is one of the basic concepts of the project, making the quality assurance and testing process the real challenge.
However as DIRAC becomes more and more popular and now is being used by several different communities,
the main question is not: *to test or not to test?*, but rather: *how to test in an efficient way?*
the main question is not: *to test or not to test?*, but rather: *how to test in an efficient way?* [#]_.

The topic of software testing is very complicated by its own nature, but depending on the testing method employed,
the testing process itself can be implemented at any time in the development phase and ideally should cover many different levels of the system:
Expand Down Expand Up @@ -89,13 +89,13 @@ This could be obtained by objects mocking technique, where all fragile component
equivalents - test doubles. For that it is recommended to use mock_ module.
Hence it is clear that knowledge of mock_ module API is essential.

Unit tests are typically created by the developer who will also write the code that is being tested. The tests may therefore share the same blind spots with the code: for example, a developer does not realize that certain input parameters must be checked, most likely neither the test nor the code will verify these input parameters. If the developer misinterprets the requirements specification for the module being developed, both the tests and the code will be wrong. Hence if the developer is going to prepare her own unit tests, she should pay attention and take extra care to implement proper testing suite, checking for every spot of possible failure (i.e. interactions with other components) and not trusting that someone else's code is always returning proper type and/or values.
Unit tests are typically created by the developer who will also write the code that is being tested. The tests may therefore share the same blind spots with the code: for example, a developer does not realize that certain input parameters must be checked, most likely neither the test nor the code will verify these input parameters [#]_. If the developer misinterprets the requirements specification for the module being developed, both the tests and the code will be wrong. Hence if the developer is going to prepare her own unit tests, she should pay attention and take extra care to implement proper testing suite, checking for every spot of possible failure (i.e. interactions with other components) and not trusting that someone else's code is always returning proper type and/or values.


Test doubles
============

Unit tests should run in *isolation*. Which means that they should run without having DIRAC fully installed, because, remember, they should just test the code logic. If, to run a unit test in DIRAC, you need a dirac.cfg file to be present, you are failing your goal.
Unit tests should run in *isolation*. Which means that they should run without having DIRAC fully installed, because, remember, they should just test the code logic. If, to run a unit test in DIRAC, you need a dirac.cfg file to be present, you are failing your goal [#]_.

To isolate the code being tested from depended-on components it is convenient and sometimes necessary to use *test doubles*:
simplified objects or procedures, that behaves and looks like the their real-intended counterparts, but are actually simplified versions
Expand Down Expand Up @@ -371,9 +371,9 @@ Footnotes
============

.. [#] Or even better software requirements document, if any of such exists. Otherwise this is a great opportunity to prepare one.
.. [#] You may ask: *isn't it silly?* No, in fact it isn't. Validation of input parameters is one of the most important tasks during testing.
.. [#] To better understand this term, think about a movie industry: if a scene movie makers are going to film is potentially dangerous and unsafe for the leading actor, his place is taken over by a stunt double.
.. [#] And eventually is killing him with a gun. At least in a TV show.
.. [#] You may ask: *isn't it silly?* No, in fact it isn't. Validation of input parameters is one of the most important tasks during testing.


.. _Python: http://www.python.org/
Expand Down
22 changes: 12 additions & 10 deletions src/DIRAC/Core/DISET/private/Transports/BaseTransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class BaseTransport:
def __init__(self, stServerAddress, bServerMode=False, **kwargs):
self.bServerMode = bServerMode
self.extraArgsDict = kwargs
self.byteStream = b""
self.byteStream = bytearray()
self.packetSize = 1048576 # 1MiB
self.stServerAddress = stServerAddress
self.peerCredentials = {}
Expand Down Expand Up @@ -191,7 +191,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal
maxBufferSize = max(maxBufferSize, 0)
try:
# Look either for message length of keep alive magic string
iSeparatorPosition = self.byteStream.find(b":", 0, 10)
iSeparatorPosition = self.byteStream.find(b":")
keepAliveMagicLen = len(BaseTransport.keepAliveMagic)
isKeepAlive = self.byteStream.find(BaseTransport.keepAliveMagic, 0, keepAliveMagicLen) == 0
# While not found the message length or the ka, keep receiving
Expand All @@ -204,17 +204,18 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal
if not retVal["Value"]:
return S_ERROR("Peer closed connection")
# New data!
self.byteStream += retVal["Value"]
# Look again for either message length of ka magic string
iSeparatorPosition = self.byteStream.find(b":", 0, 10)
self.byteStream.extend(retVal["Value"])

# Look again for either message length or keep alive magic string
iSeparatorPosition = self.byteStream.find(b":")
isKeepAlive = self.byteStream.find(BaseTransport.keepAliveMagic, 0, keepAliveMagicLen) == 0
# Over the limit?
if maxBufferSize and len(self.byteStream) > maxBufferSize and iSeparatorPosition == -1:
return S_ERROR(f"Read limit exceeded ({maxBufferSize} chars)")
# Keep alive magic!
if isKeepAlive:
gLogger.debug("Received keep alive header")
# Remove the ka magic from the buffer and process the keep alive
# Remove the keep-alive magic from the buffer and process the keep-alive
self.byteStream = self.byteStream[keepAliveMagicLen:]
return self.__processKeepAlive(maxBufferSize, blockAfterKeepAlive)
# From here it must be a real message!
Expand All @@ -225,7 +226,7 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal
if readSize >= pkgSize:
# If we already have all the data we need
data = pkgData[:pkgSize]
self.byteStream = pkgData[pkgSize:]
self.byteStream = self.byteStream[pkgSize + iSeparatorPosition + 1 :]
else:
# If we still need to read stuff
pkgMem = BytesIO()
Expand All @@ -245,11 +246,12 @@ def receiveData(self, maxBufferSize=0, blockAfterKeepAlive=True, idleReceive=Fal
# Data is here! take it out from the bytestream, dencode and return
if readSize == pkgSize:
data = pkgMem.getvalue()
self.byteStream = b""
else: # readSize > pkgSize:
self.byteStream = bytearray() # Reset the byteStream
else:
pkgMem.seek(0, 0)
data = pkgMem.read(pkgSize)
self.byteStream = pkgMem.read()
self.byteStream = bytearray(pkgMem.read()) # store the rest in bytearray

try:
data = MixedEncode.decode(data)[0]
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Core/Utilities/DEncode.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def decode(data):
if not data:
return data
# print("DECODE FUNCTION : %s" % g_dDecodeFunctions[ sStream [ iIndex ] ])
if not isinstance(data, bytes):
if not isinstance(data, (bytes, bytearray)):
raise NotImplementedError("This should never happen")
return g_dDecodeFunctions[data[0]](data, 0)

Expand Down
14 changes: 6 additions & 8 deletions src/DIRAC/TransformationSystem/Client/TransformationCLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,12 @@ def do_getFiles(self, args):
selectDict = {"TransformationID": res["Value"]["TransformationID"]}
if status:
selectDict["Status"] = status
res = self.transClient.getTransformationFiles(condDict=selectDict)
columns = ["LFN", "Status", "ErrorCount", "TargetSE", "LastUpdate"]
res = self.transClient.getTransformationFiles(condDict=selectDict, columns=columns)
if not res["OK"]:
print(f"Failed to get transformation files: {res['Message']}")
elif res["Value"]:
self._printFormattedDictList(
res["Value"], ["LFN", "Status", "ErrorCount", "TargetSE", "LastUpdate"], "LFN", "LFN"
)
self._printFormattedDictList(res["Value"], columns, "LFN", "LFN")
else:
print("No files found")

Expand All @@ -367,7 +366,8 @@ def do_getFileStatus(self, args):
print(f"Failed to get transformation information: {res['Message']}")
else:
selectDict = {"TransformationID": res["Value"]["TransformationID"]}
res = self.transClient.getTransformationFiles(condDict=selectDict)
columns = ["LFN", "Status", "ErrorCount", "TargetSE", "LastUpdate"]
res = self.transClient.getTransformationFiles(condDict=selectDict, columns=columns)
if not res["OK"]:
print(f"Failed to get transformation files: {res['Message']}")
elif res["Value"]:
Expand All @@ -376,9 +376,7 @@ def do_getFileStatus(self, args):
if fileDict["LFN"] in lfns:
filesList.append(fileDict)
if filesList:
self._printFormattedDictList(
filesList, ["LFN", "Status", "ErrorCount", "TargetSE", "LastUpdate"], "LFN", "LFN"
)
self._printFormattedDictList(filesList, columns, "LFN", "LFN")
else:
print("Could not find any LFN in", lfns, "for transformation", transName)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from DIRAC import S_OK, S_ERROR, gLogger
from DIRAC.Core.Base.Client import Client, createClient
from DIRAC.Core.Utilities.List import breakListIntoChunks
from DIRAC.Core.Utilities.JEncode import decode as jdecode
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.TransformationSystem.Client import TransformationStatus
from DIRAC.TransformationSystem.Client import TransformationFilesStatus
Expand Down Expand Up @@ -179,6 +180,9 @@ def getTransformationFiles(
res = rpcClient.getTransformationFiles(
condDict, older, newer, timeStamp, orderAttribute, offset, maxfiles, columns
)
if not res["OK"]:
return res
res, _ = jdecode(res["Value"])
# TransformationDB.getTransformationFiles includes a "Records"/"ParameterNames"
# that we don't want to return to the client so explicitly return S_OK with the value
if not res["OK"]:
Expand Down Expand Up @@ -207,6 +211,9 @@ def getTransformationFiles(
res = rpcClient.getTransformationFiles(
condDict, older, newer, timeStamp, orderAttribute, None, None, columns
)
if not res["OK"]:
return res
res, _ = jdecode(res["Value"])
if not res["OK"]:
gLogger.error(
"Error getting files for transformation %s (offset %d), %s"
Expand Down
11 changes: 1 addition & 10 deletions src/DIRAC/TransformationSystem/DB/TransformationDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,12 +632,8 @@ def getTransformationFiles(
return res

resultList = [dict(zip(columns, row)) for row in res["Value"]]
webList = [[str(item) if not isinstance(item, int) else item for item in row] for row in res["Value"]]

result = S_OK(resultList)
result["Records"] = webList
result["ParameterNames"] = columns
return result
return S_OK(resultList)

def getFileSummary(self, lfns, connection=False):
"""Get file status summary in all the transformations"""
Expand Down Expand Up @@ -888,13 +884,9 @@ def getTransformationTasks(
return res
if condDict is None:
condDict = {}
webList = []
resultList = []
for row in res["Value"]:
# Prepare the structure for the web
rList = [str(item) if not isinstance(item, int) else item for item in row]
taskDict = dict(zip(self.TASKSPARAMS, row))
webList.append(rList)
if inputVector:
taskDict["InputVector"] = ""
taskID = taskDict["TaskID"]
Expand All @@ -907,7 +899,6 @@ def getTransformationTasks(
return res
resultList.append(taskDict)
result = S_OK(resultList)
result["Records"] = webList
fstagni marked this conversation as resolved.
Show resolved Hide resolved
result["ParameterNames"] = self.TASKSPARAMS
return result

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
""" Service for interacting with TransformationDB
"""
from DIRAC import S_OK, S_ERROR

from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Security.Properties import SecurityProperty
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.JEncode import encode as jencode
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.TransformationSystem.Client import TransformationFilesStatus
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.Client.Operation import Operation


TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted(
set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES)
Expand Down Expand Up @@ -293,7 +295,7 @@ def export_getTransformationFiles(
):
if not condDict:
condDict = {}
return self.transformationDB.getTransformationFiles(
result = self.transformationDB.getTransformationFiles(
condDict=condDict,
older=older,
newer=newer,
Expand All @@ -305,6 +307,20 @@ def export_getTransformationFiles(
columns=columns,
)

# DEncode cannot cope with nested structures of multiple millions items.
# Encode everything as a json string, that DEncode can then transmit faster.

return S_OK(jencode(result))

types_getTransformationFilesAsJsonString = types_getTransformationFiles

@deprecated("Use getTransformationFiles instead")
def export_getTransformationFilesAsJsonString(self, *args, **kwargs):
"""
Deprecated call -- redirect to getTransformationFiles
"""
return self.export_getTransformationFiles(*args, **kwargs)

####################################################################
#
# These are the methods to manipulate the TransformationTasks table
Expand Down
Loading