Skip to content

Commit

Permalink
Adding deployment ID to db and Tier0Config (#4632)
Browse files Browse the repository at this point in the history
  • Loading branch information
germanfgv authored Nov 12, 2021
1 parent 958c1b4 commit 05be43e
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 9 deletions.
3 changes: 1 addition & 2 deletions etc/ReplayOfflineConfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""
from __future__ import print_function

import datetime
from T0.RunConfig.Tier0Config import addDataset
from T0.RunConfig.Tier0Config import createTier0Config
from T0.RunConfig.Tier0Config import setAcquisitionEra
Expand Down Expand Up @@ -118,7 +117,7 @@
hiTestppScenario = "ppEra_Run3"

# Procesing version number replays
dt = int(datetime.datetime.now().strftime("%y%m%d%H%M"))
dt = 1
defaultProcVersion = dt
expressProcVersion = dt
alcarawProcVersion = dt
Expand Down
14 changes: 7 additions & 7 deletions src/python/T0/RunConfig/RunConfigAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ def configureRunStream(tier0Config, run, stream, specDirectory, dqmUploadProxy):
taskName = "Repack"

if tier0Config.Global.EnableUniqueWorkflowName:
workflowName = "Repack_Run%d_Stream%s_%s_v%s_%s" % (run, stream,
tier0Config.Global.AcquisitionEra, streamConfig.Repack.ProcessingVersion,
workflowName = "Repack_Run%d_Stream%s_%s_ID%d_v%s_%s" % (run, stream,
tier0Config.Global.AcquisitionEra, tier0Config.Global.DeploymentID, streamConfig.Repack.ProcessingVersion,
time.strftime('%y%m%d_%H%M', time.localtime(time.time())))
else:
workflowName = "Repack_Run%d_Stream%s" % (run, stream)
Expand Down Expand Up @@ -605,8 +605,8 @@ def configureRunStream(tier0Config, run, stream, specDirectory, dqmUploadProxy):
taskName = "Express"

if tier0Config.Global.EnableUniqueWorkflowName:
workflowName = "Express_Run%d_Stream%s_%s_v%s_%s" % (run, stream,
tier0Config.Global.AcquisitionEra, streamConfig.Express.ProcessingVersion,
workflowName = "Express_Run%d_Stream%s_%s_ID%d_v%s_%s" % (run, stream,
tier0Config.Global.AcquisitionEra, tier0Config.Global.DeploymentID, streamConfig.Express.ProcessingVersion,
time.strftime('%y%m%d_%H%M', time.localtime(time.time())))
else:
workflowName = "Express_Run%d_Stream%s" % (run, stream)
Expand Down Expand Up @@ -1036,8 +1036,8 @@ def releasePromptReco(tier0Config, specDirectory, dqmUploadProxy):
taskName = "Reco"

if tier0Config.Global.EnableUniqueWorkflowName:
workflowName = "PromptReco_Run%d_%s_%s_v%s_%s" % (run, dataset,
tier0Config.Global.AcquisitionEra, datasetConfig.ProcessingVersion,
workflowName = "PromptReco_Run%d_%s_%s_ID%d_v%s_%s" % (run, dataset,
tier0Config.Global.AcquisitionEra, tier0Config.Global.DeploymentID, datasetConfig.ProcessingVersion,
time.strftime('%y%m%d_%H%M', time.localtime(time.time())))
else:
workflowName = "PromptReco_Run%d_%s" % (run, dataset)
Expand Down Expand Up @@ -1162,4 +1162,4 @@ def releasePromptReco(tier0Config, specDirectory, dqmUploadProxy):
else:
myThread.transaction.commit()

return
return
13 changes: 13 additions & 0 deletions src/python/T0/RunConfig/Tier0Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
| | |--> DefaultScramArch - Default ScramArch if nothing else is specified for release
| | |
| | |--> BaseRequestPriority - Base for request priorities for PromptReco/Repack/Express
| | |
| | |--> DeploymentID - Unique identifier for every T0 Agent deployment
| |
| |
| |--> Streams - Configuration parameters that belong to a particular stream
Expand Down Expand Up @@ -268,6 +270,8 @@ def createTier0Config():

tier0Config.Global.EnableUniqueWorkflowName = False

tier0Config.Global.DeploymentID = 1

return tier0Config

def retrieveStreamConfig(config, streamName):
Expand Down Expand Up @@ -747,6 +751,15 @@ def setEnableUniqueWorkflowName(config):
config.Global.EnableUniqueWorkflowName = True
return

def setDeploymentId(config, id):
"""
_setDeploymentId_
Sets an ID for the current deployment of T0
"""
config.Global.DeploymentID = id
return

def ignoreStream(config, streamName):
"""
_ignoreStream_
Expand Down
8 changes: 8 additions & 0 deletions src/python/T0/WMBS/Oracle/Create.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ def __init__(self, logger = None, dbi = None, params = None):
primary key(run_id)
) ORGANIZATION INDEX"""

self.create[len(self.create)] = \
"""CREATE TABLE t0_deployment_id (
name varchar2(15) not null,
id int not null,
primary key(name),
constraint CK_DEPLOY_ID CHECK (name='deployment_id')
) ORGANIZATION INDEX"""

self.create[len(self.create)] = \
"""CREATE TABLE run_status (
id int not null,
Expand Down
22 changes: 22 additions & 0 deletions src/python/T0/WMBS/Oracle/Tier0Feeder/GetDeploymentID.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
_GetDeploymentID_
Oracle implementation of GetDeploymentID
Retrieves T0 deployment ID
"""
from WMCore.Database.DBFormatter import DBFormatter

class GetDeploymentID(DBFormatter):

def execute(self, conn = None, transaction = False):

sql = """SELECT id
from t0_deployment_id"""

results = self.dbi.processData(sql, {}, conn = conn,
transaction = transaction)[0].fetchall()
id = 0
if results:
id=results[0][0]

return id
21 changes: 21 additions & 0 deletions src/python/T0/WMBS/Oracle/Tier0Feeder/SetDeploymentID.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
_SetDeploymentID_
Oracle implementation of SetDeploymentID
Sets T0 deployment ID
"""
from WMCore.Database.DBFormatter import DBFormatter

class SetDeploymentID(DBFormatter):

def execute(self, id, conn = None, transaction = False):

sql = """INSERT INTO t0_deployment_id
(name, id)
VALUES ('deployment_id', :ID)"""

binds = {"ID" : id}
results = self.dbi.processData(sql, binds, conn = conn,
transaction = transaction)

return
23 changes: 23 additions & 0 deletions src/python/T0Component/Tier0Feeder/Tier0FeederPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import threading
import subprocess
import datetime

from Utils.Timers import timeFunction
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread
Expand All @@ -28,6 +29,7 @@
from T0.RunLumiCloseout import RunLumiCloseoutAPI
from T0.ConditionUpload import ConditionUploadAPI
from T0.StorageManager import StorageManagerAPI
from T0.RunConfig.Tier0Config import setDeploymentId


class Tier0FeederPoller(BaseWorkerThread):
Expand Down Expand Up @@ -98,6 +100,22 @@ def __init__(self, config):
logger = logging,
dbinterface = dbInterfaceT0DataSvc)

#
# Set deployment ID
#

SetDeploymentIdDAO = self.daoFactory(classname = "Tier0Feeder.SetDeploymentID")
GetDeploymentIdDAO = self.daoFactory(classname = "Tier0Feeder.GetDeploymentID")
try:
self.deployID = GetDeploymentIdDAO.execute()
if self.deployID == 0:
self.deployID = int(datetime.datetime.now().strftime("%y%m%d%H%M%S"))
SetDeploymentIdDAO.execute(self.deployID)

except:
logging.exception("Something went wrong with setting deployment ID")
raise

return

@timeFunction
Expand Down Expand Up @@ -158,6 +176,11 @@ def algorithm(self, parameters = None):
# shouldn't happen, just a catch all insurance
logging.exception("Something went wrong with data retrieval from StorageManager")

#
# Set deployment ID
#
setDeploymentId(tier0Config, self.deployID)
logging.info("Deploy ID: %d" % tier0Config.Global.DeploymentID)

#
# find new runs, setup global run settings and stream/dataset/trigger mapping
Expand Down

0 comments on commit 05be43e

Please sign in to comment.