Skip to content

Commit

Permalink
added prompusher
Browse files Browse the repository at this point in the history
  • Loading branch information
voetberg committed Feb 12, 2024
1 parent 86f8be2 commit 2ab81e0
Showing 1 changed file with 80 additions and 56 deletions.
136 changes: 80 additions & 56 deletions common/check_obsolete_replicas
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) 2013
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -8,77 +8,101 @@
# Authors:
# - Vincent Garonne, <[email protected]>, 2015
# - Cedric Serfon, <[email protected]>, 2018
# - Maggie Voetberg, <[email protected]>, 2024

'''
Probe to check the backlog of obsolete replicas.
'''

import sys
import traceback
from sqlalchemy.sql import text
from rucio.db.sqla.session import BASE, get_session
from utils.common import PrometheusPusher

from rucio.db.sqla.session import get_session
if BASE.metadata.schema:
schema = BASE.metadata.schema + '.'
else:
schema = ''

# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3


if __name__ == "__main__":
try:
SESSION = get_session()
QUERY = '''BEGIN
FOR u in (SELECT
a.rse_id AS rse_id,
NVL(b.files, 0) AS files,
NVL(b.bytes, 0) AS bytes,
SYS_EXTRACT_UTC(localtimestamp) AS updated_at
FROM
(
SELECT
id AS rse_id
FROM
atlas_rucio.rses
WHERE
deleted=0) a
LEFT OUTER JOIN
(
SELECT
/*+ INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX) */
rse_id,
COUNT(1) AS files,
SUM(bytes) AS bytes
FROM
atlas_rucio.replicas
WHERE
(
CASE
WHEN tombstone IS NOT NULL
THEN rse_id
END) IS NOT NULL
AND tombstone=to_date('1-1-1970 00:00:00','MM-DD-YYYY HH24:Mi:SS')
GROUP BY
rse_id) b
ON
a.rse_id=b.rse_id)
session = get_session()
with PrometheusPusher() as manager:
query = '''BEGIN
FOR u in (SELECT
a.rse_id AS rse_id,
NVL(b.files, 0) AS files,
NVL(b.bytes, 0) AS bytes,
SYS_EXTRACT_UTC(localtimestamp) AS updated_at
FROM
(
SELECT
id AS rse_id
FROM
{schema}rses
WHERE
deleted=0) a
LEFT OUTER JOIN
(
SELECT
/*+ INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX) */
rse_id,
COUNT(1) AS files,
SUM(bytes) AS bytes
FROM
{schema}replicas
WHERE
(
CASE
WHEN tombstone IS NOT NULL
THEN rse_id
END) IS NOT NULL
AND tombstone=to_date('1-1-1970 00:00:00','MM-DD-YYYY HH24:Mi:SS')
GROUP BY
rse_id) b
ON
a.rse_id=b.rse_id)
LOOP
MERGE INTO atlas_rucio.RSE_USAGE
USING DUAL
ON (atlas_rucio.RSE_USAGE.rse_id = u.rse_id and source = 'obsolete')
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at)
WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at;
LOOP
MERGE INTO {schema}RSE_USAGE
USING DUAL
ON ({schema}RSE_USAGE.rse_id = u.rse_id and source = 'obsolete')
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at)
WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at;
MERGE INTO ATLAS_RUCIO.RSE_USAGE_HISTORY H
USING DUAL
ON (h.rse_id = u.rse_id and h.source = 'obsolete' and h.updated_at = u.updated_at)
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at);
MERGE INTO {schema}RSE_USAGE_HISTORY H
USING DUAL
ON (h.rse_id = u.rse_id and h.source = 'obsolete' and h.updated_at = u.updated_at)
WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at)
VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at);
COMMIT;
END LOOP;
END;
'''
SESSION.execute(QUERY)
except Exception as error:
print error
COMMIT;
END LOOP;
END;'''.format(schema=schema)

for result in session.execute(text(query)):
print(result)

rse_id = result[0]
bytes_sum = result[2]
files_count = result[3]

manager.gauge(name="obsolete_replicas_files.{rse}",
documentation="Probe to check the backlog of obsolete replicas.").labels(rse=rse_id).set(files_count)

manager.gauge(name="obsolete_replicas_bytes.{rse}",
documentation="Probe to check the backlog of obsolete replicas.").labels().set(bytes_sum)


except:
print(traceback.format_exc())
sys.exit(UNKNOWN)
finally:
session.remove()
sys.exit(OK)

0 comments on commit 2ab81e0

Please sign in to comment.