Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 62 additions & 58 deletions src/DIRAC/ProductionSystem/DB/ProductionDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ def addProduction(self, prodName, prodDescription, author, authorGroup, connecti

self.lock.acquire()

req = (
"INSERT INTO Productions (ProductionName,Description,CreationDate,LastUpdate,\
req = "INSERT INTO Productions (ProductionName,Description,CreationDate,LastUpdate,\
Author,AuthorGroup,Status)\
VALUES ('%s','%s',UTC_TIMESTAMP(),UTC_TIMESTAMP(),'%s','%s','New');"
% (prodName, prodDescription, author, authorGroup)
)
VALUES (%s,%s,UTC_TIMESTAMP(),UTC_TIMESTAMP(),%s,%s,'New')"
args = (prodName, prodDescription, author, authorGroup)

res = self._update(req, conn=connection)
res = self._update(req, args=args, conn=connection)
if not res["OK"]:
self.lock.release()
return res
Expand Down Expand Up @@ -133,9 +131,12 @@ def getProductions(
:param bool connection:
:return:
"""
validTimestamps = {"lastupdate", "creationdate"}
if timeStamp.lower() not in validTimestamps:
return S_ERROR(f"Invalid timeStamp: {timeStamp}")

connection = self.__getConnection(connection)
req = "SELECT {} FROM Productions {}".format(
req = "SELECT {} FROM Productions {}".format( # nosec: B608
intListToString(self.PRODPARAMS),
self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset),
)
Expand Down Expand Up @@ -208,8 +209,8 @@ def getProductionStep(self, stepID, connection=False):
:return: the attributes of Production Step corresponding to the stepID
"""
connection = self.__getConnection(connection)
req = f"SELECT {intListToString(self.PRODSTEPSPARAMS)} FROM ProductionSteps WHERE StepID = {str(stepID)}"
res = self._query(req, conn=connection)
req = f"SELECT {intListToString(self.PRODSTEPSPARAMS)} FROM ProductionSteps WHERE StepID = %s" # nosec
res = self._query(req, args=(str(stepID),), conn=connection)
if not res["OK"]:
return res
if not res["Value"]:
Expand Down Expand Up @@ -249,26 +250,24 @@ def addProductionStep(
"""
connection = self.__getConnection(connection)
self.lock.acquire()
req = (
"INSERT INTO ProductionSteps (Name,Description,LongDescription,Body,Type,Plugin,AgentType,GroupSize,\
req = "INSERT INTO ProductionSteps (Name,Description,LongDescription,Body,Type,Plugin,AgentType,GroupSize,\
InputQuery,OutputQuery,LastUpdate,InsertedTime)\
VALUES ('%s','%s', '%s', %s, '%s', '%s', '%s', '%s', '%s', '%s',\
VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s,\
UTC_TIMESTAMP(),UTC_TIMESTAMP());"
% (
stepName,
stepDescription,
stepLongDescription,
stepBody,
stepType,
stepPlugin,
stepAgentType,
stepGroupSize,
stepInputquery,
stepOutputquery,
)
args = (
stepName,
stepDescription,
stepLongDescription,
stepBody,
stepType,
stepPlugin,
stepAgentType,
stepGroupSize,
stepInputquery,
stepOutputquery,
)

res = self._update(req, conn=connection)
res = self._update(req, args=args, conn=connection)
if not res["OK"]:
self.lock.release()
return res
Expand Down Expand Up @@ -300,14 +299,18 @@ def getProductionTransformations(
:param str prodName: the Production name or ID
:return:
"""
validTimestamps = {"creationtime", "creationdate"}
if timeStamp.lower() not in validTimestamps:
return S_ERROR(f"Invalid timeStamp: {timeStamp}")

res = self._getConnectionProdID(connection, prodName)
if not res["OK"]:
return res
connection = res["Value"]["Connection"]
prodID = res["Value"]["ProductionID"]
condDict = {"ProductionID": prodID}

req = "SELECT {} FROM ProductionTransformations {}".format(
req = "SELECT {} FROM ProductionTransformations {}".format( # nosec: B608
intListToString(self.TRANSPARAMS),
self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset),
)
Expand All @@ -334,8 +337,8 @@ def __setProductionStatus(self, prodID, status, connection=False):
:param int prodID: the ProductionID
:param str status: the Production status
"""
req = "UPDATE Productions SET Status='%s', LastUpdate=UTC_TIMESTAMP() WHERE ProductionID=%d" % (status, prodID)
return self._update(req, conn=connection)
req = "UPDATE Productions SET Status=%s, LastUpdate=UTC_TIMESTAMP() WHERE ProductionID=%s"
return self._update(req, args=(status, str(prodID)), conn=connection)

# This is to be replaced by startProduction, stopProduction etc.
def setProductionStatus(self, prodName, status, connection=False):
Expand Down Expand Up @@ -422,8 +425,8 @@ def __deleteProduction(self, prodID, connection=False):

:param int prodID: ProductionID
"""
req = "DELETE FROM Productions WHERE ProductionID=%d;" % prodID
return self._update(req, conn=connection)
req = "DELETE FROM Productions WHERE ProductionID=%s"
return self._update(req, args=(str(prodID),), conn=connection)

def __deleteProductionTransformations(self, prodID, connection=False):
"""Remove all the transformations of the specified production from the TS and from the PS
Expand All @@ -438,13 +441,13 @@ def __deleteProductionTransformations(self, prodID, connection=False):
gLogger.error("Failed to delete production transformations from the TS", res["Message"])

# Remove transformations from the PS
req = "DELETE FROM ProductionTransformationLinks WHERE ProductionID = %d;" % prodID
res = self._update(req, conn=connection)
req = "DELETE FROM ProductionTransformationLinks WHERE ProductionID = %s"
res = self._update(req, args=(str(prodID),), conn=connection)
if not res["OK"]:
gLogger.error("Failed to delete production transformation links from the PS", res["Message"])

req = "DELETE FROM ProductionTransformations WHERE ProductionID = %d;" % prodID
res = self._update(req, conn=connection)
req = "DELETE FROM ProductionTransformations WHERE ProductionID = %s"
res = self._update(req, args=(str(prodID),), conn=connection)
if not res["OK"]:
gLogger.error("Failed to delete production transformations from the PS", res["Message"])

Expand Down Expand Up @@ -569,24 +572,23 @@ def __addTransformationLinks(self, prodID, transIDs, parentTransIDs=None, connec
:param prodName: the Production name or ID
:param transIDs: the list of transformations to be associated ProductionTransformationLinks table
"""
req = "INSERT INTO ProductionTransformationLinks \
(TransformationID,ParentTransformationID,ProductionID) VALUES"

# Insert transformations and the corresponding parent transformations
if parentTransIDs:
for transID in transIDs:
for parentTransID in parentTransIDs:
req = "%s (%d,%d,%d)," % (req, transID, parentTransID, prodID)
# If parent transformations are not defined, just insert transformations and use the parent transformation default
# value -1
cmd = (
"INSERT INTO ProductionTransformationLinks "
"(TransformationID,ParentTransformationID,ProductionID) "
"VALUES (%s, %s, %s)"
)
data = [(transID, parentTransID, prodID) for transID in transIDs for parentTransID in parentTransIDs]
else:
req = "INSERT INTO ProductionTransformationLinks (TransformationID,ProductionID) VALUES"
for transID in transIDs:
req = "%s (%d,%d)," % (req, transID, prodID)
cmd = (
"INSERT INTO ProductionTransformationLinks "
"(TransformationID,ParentTransformationID,ProductionID) "
"VALUES (%s, -1, %s)"
)
data = [(transID, prodID) for transID in transIDs]

gLogger.notice(req)
req = req.rstrip(",")
res = self._update(req, conn=connection)
res = self._updatemany(cmd, data, conn=connection)
if not res["OK"]:
return res

Expand All @@ -598,13 +600,14 @@ def __addTransformations(self, prodID, transIDs, connection=False):
:param str prodName: the Production name or ID
:param list transIDs: the list of transformations to be added to the production
"""
req = "INSERT INTO ProductionTransformations \
(ProductionID,TransformationID,LastUpdate,InsertedTime) VALUES"
for transID in transIDs:
req = "%s (%d,%d,UTC_TIMESTAMP(),UTC_TIMESTAMP())," % (req, prodID, transID)
gLogger.notice(req)
req = req.rstrip(",")
res = self._update(req, conn=connection)
cmd = (
"INSERT INTO ProductionTransformations "
"(ProductionID,TransformationID,LastUpdate,InsertedTime) "
"VALUES (%s, %s, UTC_TIMESTAMP(), UTC_TIMESTAMP())"
)
data = [(prodID, transID) for transID in transIDs]

res = self._updatemany(cmd, data, conn=connection)
if not res["OK"]:
return res

Expand All @@ -617,12 +620,13 @@ def _getProductionID(self, prodName, connection=False):
"""
try:
prodName = int(prodName)
cmd = "SELECT ProductionID from Productions WHERE ProductionID=%d;" % prodName
cmd = "SELECT ProductionID from Productions WHERE ProductionID=%s"
except Exception:
if not isinstance(prodName, str):
return S_ERROR("Production should be ID or name")
cmd = f"SELECT ProductionID from Productions WHERE ProductionName='{prodName}';"
res = self._query(cmd, conn=connection)
cmd = "SELECT ProductionID from Productions WHERE ProductionName=%s"
# prodName may have been cast to an int here
res = self._query(cmd, args=(str(prodName),), conn=connection)
if not res["OK"]:
gLogger.error("Failed to obtain production ID for production", f"{prodName}: {res['Message']}")
return res
Expand Down
Loading