diff --git a/DMOps/file_invalidation_tool/.gitignore b/DMOps/file_invalidation_tool/.gitignore new file mode 100644 index 00000000..fb293791 --- /dev/null +++ b/DMOps/file_invalidation_tool/.gitignore @@ -0,0 +1 @@ +secrets/* \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/Dockerfile b/DMOps/file_invalidation_tool/Dockerfile new file mode 100644 index 00000000..830b480f --- /dev/null +++ b/DMOps/file_invalidation_tool/Dockerfile @@ -0,0 +1,34 @@ +FROM registry.cern.ch/cmsmonitoring/cmsmon-spark:v0.5.0.1 + +# Set environment variables +ENV PYCURL_SSL_LIBRARY=nss \ + X509_USER_CERT=/certs/usercert.pem \ + X509_USER_KEY=/certs/userkey.pem \ + RUCIO_CONFIG=/cvmfs/cms.cern.ch/rucio/rucio.cfg \ + RUCIO_ACCOUNT=transfer_ops \ + DRIVERPORT=5001 \ + BMPORT=5002 \ + UIPORT=5003 + +# Install dependencies for Rucio, DBS and Gfal + +RUN dnf install -y libcurl-devel openssl-devel libffi-devel ca-policy-egi-core \ + && dnf install -y gfal2-all python3-gfal2 python3-gfal2-util \ + && dnf install -y cmake gfal2-devel libcurl-devel \ + && dnf -y groupinstall "Development Tools" || true \ + && pip3 install cx-Oracle SQLAlchemy==1.4.49 dbs3-client rucio-clients \ + && pip3 install --compile --global-option="--with-nss" --no-cache-dir pycurl + +# Expose ports +EXPOSE 5001 +EXPOSE 5002 +EXPOSE 5003 + +# Copy code +COPY ./src /src/ + +# Set working directory +WORKDIR /src + +# Set entrypoint +ENTRYPOINT ["python3", "run_invalidations.py"] \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/input_examples/checksum_val.csv b/DMOps/file_invalidation_tool/input_examples/checksum_val.csv new file mode 100644 index 00000000..ad53ead1 --- /dev/null +++ b/DMOps/file_invalidation_tool/input_examples/checksum_val.csv @@ -0,0 +1,16 @@ +FILENAME,RSE_EXPRESSION +/store/mc/RunIISummer20UL18MiniAODv2/GluGluToBulkGravitonToHHTo2B2Tau_M-400_TuneCP5_PSWeights_narrow_13TeV-madgraph-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/2520000/06DDDBA7-B312-2E46-A22A-C76AF34D192C.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ZH_HToBB_ZToQQ_M-125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/2430000/EC177528-23FC-FB4E-9010-473CA26570AC.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ZHToTauTau_M125_CP5_13TeV-powheg-pythia8_ext1/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/80000/EF503A5C-98E7-434C-9DD9-75CE61BDEFAB.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/HWplusJ_HToWW_M-125_TuneCP5_13TeV-powheg-jhugen727-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/70000/591EF774-ADD7-B741-9074-93F25C3667BC.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/HWplusJ_HToWW_M-125_TuneCP5_13TeV-powheg-jhugen727-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/70000/51474CA2-DAF8-0C43-B360-58D0F14F4A1D.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/QCD_HT300to500_TuneCP5_PSWeights_13TeV-madgraph-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v1/30000/979ACFAB-ED97-3A41-A809-EBF77A5146DA.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/QCD_Pt-470To600_MuEnrichedPt5_TuneCP5_13TeV-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/260000/91B1AB52-B822-8541-B016-5232BF621F98.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ttHTobb_M125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/250000/890C5D33-E0F6-094C-8510-8DDC40D2B742.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ttHTobb_M125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/250000/1694A7CA-F3DA-8140-B01D-98B828E5EB20.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ttHTobb_M125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/250000/89F8B638-7285-AC44-9DFF-B78C3177E6EE.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ttHTobb_M125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/250000/BF5A2984-E146-8843-B60F-E5C29888FBBE.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ttHTobb_M125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/250000/6330270D-F058-F840-AF9A-F69530956178.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ttHTobb_M125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/250000/7729B11E-A1FE-E845-8741-36CAF547AC02.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ttHTobb_M125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/250000/4DAFAEEE-245F-CA42-A6B0-EFAF2FDD464B.root,rse_type=DISK +/store/mc/RunIISummer20UL18MiniAODv2/ttHTobb_M125_TuneCP5_13TeV-powheg-pythia8/MINIAODSIM/106X_upgrade2018_realistic_v16_L1v1-v2/250000/D61AE12C-A68A-754A-B90D-44688705DD4A.root,rse_type=DISK \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/input_examples/dids.txt b/DMOps/file_invalidation_tool/input_examples/dids.txt new file mode 100644 index 00000000..4497622f --- /dev/null +++ b/DMOps/file_invalidation_tool/input_examples/dids.txt @@ -0,0 +1 @@ +/DMsimp_t-S3D_uR_JChiChi_Mphi-2000_Mchi-350_Lambda-1p0_TuneCUETP8M1_13TeV-madgraph_pythia8/RunIIFall17NanoAODv7-PU2017_12Apr2018_Nano02Apr2020_102X_mc2017_realistic_v8-v1/NANOAODSIM#1ecf1b2a-df7e-4c8d-82e1-2bc0b49d5271 \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/readme.md b/DMOps/file_invalidation_tool/readme.md new file mode 100644 index 00000000..ff4ceb17 --- /dev/null +++ b/DMOps/file_invalidation_tool/readme.md @@ -0,0 +1,120 @@ +# File Invalidation Tool for Rucio and DBS +## Overview + +This guide outlines the steps to run the file invalidation tool for Rucio and DBS using Docker image. The tool assists in invalidating specific files, datasets or containers within these systems, to ensure data consistency. Additionally, it has a running mode to check the integrity of files in a given RSE(checksum validation), and invalidate the corrupted replicas. Finally, the tool can also be used to invalidate all files in a given site. + +## Prerequisites, Folder Structure and tool input + +### Tool Input + +The tool has 5 running modes. It's important that your cert and key (decrypted) have enough permissions to invalidate on DBS and declare replicas as corrupted on Rucio, additionally to this it they require the following inputs and parameters: + +| Running Mode | Description | Tool Mode | Input File | Params | Auth Requirements | +| ----------- | ----------- | ----------- | ----------- | ----------- | ----------- | +| Global Invalidation | Invalidate all files from received files, datasets or containers list on Rucio and DBS | `global` | `.txt`: txt file containing list of files, datasets or containers | `--user `: Kerberos user for spark jobs
`--reason `: comment for invalidation
`--dry-run`(**optional**): Simulate the execution without actually performing the file invalidation
`--erase-mode`(**optional**): Erase empty DIDs | `./certs/usercert.pem`
`./certs/userkey.pem`
`./secrets/.keytab`| +| DBS Invalidation | Invalidate all files from received files, datasets or containers list only on DBS | `only-dbs` | `.txt`: txt file containing list of files, datasets or containers |`--user `: Kerberos user for spark jobs
`--reason `: comment for invalidation
`--dry-run`(**optional**): Simulate the execution without actually performing the file invalidation
`--erase-mode`(**optional**): Erase empty DIDs | `./certs/usercert.pem`
`./certs/userkey.pem`
`./secrets/.keytab`| +| Rucio Invalidation | Invalidate all files from received files, datasets or containers list only on Rucio | `only-rucio` | `.txt`: txt file containing list of files, datasets or containers | `--user `: Kerberos user for spark jobs
`--reason `: comment for invalidation
`--dry-run`(**optional**): Simulate the execution without actually performing the file invalidation
`--erase-mode`(**optional**): Erase empty DIDs | `./certs/usercert.pem`
`./certs/userkey.pem`
`./secrets/.keytab`| +| Integrity Validation | Validate integrity of files in the given RSE | `integrity-validation` | `.csv`: csv file containing list of files and RSE [FILENAME,RSE_EXPRESSION] | `--dry-run`(**optional**): Simulate the execution without actually performing the file invalidation in case of being corrupted | `./certs/usercert.pem`
`./certs/userkey.pem`| +| Site Invalidation | Invalidate in Rucio all files from received list at a specific site | `site-invalidation` | `.txt`: txt file containing list of files, datasets or containers | `--user `: Kerberos user for spark jobs
`--rse `: RSE to invalidate at
`--reason `: comment for invalidation
`--dry-run`(**optional**): Simulate the execution without actually performing the file invalidation | `./certs/usercert.pem`
`./certs/userkey.pem`
`./secrets/.keytab`| + +> **Note:** The userkey.pem should be decrypted. + +??? Example + **USERKEY decryption** + `openssl rsa -in -out userkey.pem` + + You would be asked to enter the password. + +??? Info + **Checksum Validation Mode** + + Some files could be heavy and may lead to exceed your lxplus quota. In case of seeing this error move your working directory to `/eos/user///` directory. + ```Bash + gfal-copy error: 122 (Disk quota exceeded) - errno reported by local system call Disk quota exceeded + ``` + +### Environment + +This script is thought to be run on **lxplus** or CERN server with access to `registry.cern.ch` and `/cvmfs/` directory. + +Setting all together, the working directory structure can change a bit, but it should look like this: +working_directory/ +├── dids.txt / replicas_validation.csv +├── certs/ +│ ├── usercert.pem +│ └── userkey.pem + +## Run File Invalidation tool + +### 1. CERN Registry Authentication + +1. Visit [cern registry](https://registry.cern.ch/). +2. Login via OIDC Provider. +3. Click on your username located in the top right. +4. Click on **User Profile** +5. Copy the **CLI Secret**, it will be used in the next step. + +### 2. Login into CERN Registry +```Bash +docker login registry.cern.ch -u +``` +- `docker login`: Logs in to the Docker registry. +- `registry.cern.ch`: CERN registry URL. +- `-u `: CERN registry username. + +It will ask you to enter your password. **Enter your CLI Secret.** + +### 3. Run the container + +```Bash +docker run -P \ + -v "$(pwd)/:/input/" \ + -v "$(pwd)/certs:/certs" \ + [-v "$(pwd)/secrets:/secrets" \] + --mount type=bind,source=/cvmfs/,target=/cvmfs/,readonly \ + --network host --rm registry.cern.ch/cmsrucio/file_invalidation_tool [Tool_Mode_Options] +``` +- `docker run`: Executes a Docker container. +- `-P`: Publishes all exposed ports to the host interfaces. +- Volumes mounted: + - `-v "$(pwd)/:/input/"`: Mounts the containers_inv.txt file from the host to /input/dids.txt within the container. + - `-v "$(pwd)/certs:/certs"`: Mounts the certs directory from the host to /certs within the container. It must contain the usercert.pem and userkey.pem. + - `-v "$(pwd)/secrets:/secrets"`: Mounts the secrets directory from the host to /secrets within the container. It must contain the keytab file. + - `--mount type=bind,source=/cvmfs/,target=/cvmfs/,readonly`: Binds the /cvmfs/ directory on the host as read-only within the container. +- `--network host`: Uses the host's network stack within the container. +- `--rm`: Automatically removes the container when it exits. +- `registry.cern.ch/cmsrucio/file_invalidation_tool`: Name of the Docker image to run. + +??? Example + + ```Bash + docker run -P \ + -v "$(pwd)/:/input/.txt" \ + -v "$(pwd)/certs:/certs" \ + -v "$(pwd)/secrets:/secrets" \ + --mount type=bind,source=/cvmfs/,target=/cvmfs/,readonly \ + --network host --rm registry.cern.ch/cmsrucio/file_invalidation_tool [global | only-dbs | only-rucio] --user --reason + ``` + + ```Bash + docker run -P \ + -v "$(pwd)/:/input/.csv" \ + -v "$(pwd)/certs:/certs" \ + --mount type=bind,source=/cvmfs/,target=/cvmfs/,readonly \ + --network host --rm registry.cern.ch/cmsrucio/file_invalidation_tool integrity-validation + ``` + + ```Bash + docker run -P \ + -v "$(pwd)/:/input/.txt" \ + -v "$(pwd)/certs:/certs" \ + -v "$(pwd)/secrets:/secrets" \ + --mount type=bind,source=/cvmfs/,target=/cvmfs/,readonly \ + --network host --rm registry.cern.ch/cmsrucio/file_invalidation_tool site-invalidation --rse --user --reason + ``` +## Additional Notes + +- The tool's output will provide details about the invalidation process. +- User Authorization: Ensure you have the necessary permissions to invalidate on DBS. + - The provided certificates will be used for DBS invalidation, in case of authorization errors, rucio invalidation will not be executed. + - Rucio Invalidation will be done using the the dmtops certificate and transfer_ops account since many users will not have permissions to develop this operation. \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/container_invalidation_spark.py b/DMOps/file_invalidation_tool/src/container_invalidation_spark.py new file mode 100644 index 00000000..0e8c92ad --- /dev/null +++ b/DMOps/file_invalidation_tool/src/container_invalidation_spark.py @@ -0,0 +1,50 @@ +from pyspark.sql.functions import col, collect_list, concat_ws +import click as click +from CMSSpark.spark_utils import get_spark_session +from hadoop_queries import get_df_rse_locks, get_df_rse_replicas, get_df_contents +from pyspark.sql.window import Window + +@click.command() +@click.option('--filename', required=True, default=None, type=str, + help='Name of the text file having the datasets names') +@click.option('--rse', required=False, default=None, type=str, + help='RSE to look at') +def invalidate_containers(filename,rse): + spark = get_spark_session(app_name='global_containers_invalidation') + + #Read the containers to delete + filename = f'/user/dmtops/{filename}' + df_delete = spark.read.text(filename) + df_delete = df_delete.withColumnRenamed('value','CONTAINER') + + #Get the basic df + df_locks = get_df_rse_locks(spark) + df_replicas = get_df_rse_replicas(spark,rse) + df_contents = get_df_contents(spark).alias('co') + + #Get the content of the containers to delete (content includes filename, dataset and container) + df_delete = df_delete.join(df_contents,df_delete.CONTAINER==df_contents.CONTAINER,how='inner').select(['co.*']).alias('de') + + #Replicas to declare as bad + df_delete = df_delete.join(df_replicas,df_delete.FILENAME==df_replicas.NAME,how='inner').select(['de.*','RSE','REPLICA_STATE']).alias('de') + + #Rules protecting the replicas + df_delete = df_delete.join(df_locks,(df_delete.FILENAME==df_locks.NAME) & (df_delete.RSE == df_locks.RSE),how='left').select(['de.*','RULE_ID']).alias('de') + df_delete.cache() + + #Files to invalidate on DBS + df_delete.select('FILENAME').distinct().toPandas().to_csv('/input/dbs_files_inv.txt',index=False, header = False) + + windowSpec = Window.partitionBy('FILENAME') + df_delete.withColumn("RSES", collect_list(col("RSE")).over(windowSpec)) \ + .select(['FILENAME','RSES']).withColumn("RSES", concat_ws(";", "RSES")).distinct().toPandas().to_csv('/input/rucio_replicas_inv.csv',index=False) + + #Replicas to erase from Rucio + df_delete.select('DATASET').distinct().toPandas().to_csv('/input/datasets_inv.txt',index=False,header=False) + + #RSE is exported in case it's tape and require purge_replicas + df_delete.filter(col('RULE_ID').isNotNull()).select(['RULE_ID','RSE']).distinct()\ + .toPandas().to_csv('/input/rucio_rules_delete.csv',index=False) + +if __name__ == "__main__": + invalidate_containers() \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/dataset_invalidation_spark.py b/DMOps/file_invalidation_tool/src/dataset_invalidation_spark.py new file mode 100644 index 00000000..367d3abb --- /dev/null +++ b/DMOps/file_invalidation_tool/src/dataset_invalidation_spark.py @@ -0,0 +1,54 @@ +import click as click +from CMSSpark.spark_utils import get_spark_session +from pyspark.sql.functions import col, collect_list, concat_ws +from hadoop_queries import get_df_rse_locks, get_df_rse_replicas, get_df_contents, get_df_dataset_level_rules +from pyspark.sql.window import Window + +@click.command() +@click.option('--filename', required=True, default=None, type=str, + help='Name of the text file having the datasets names') +@click.option('--rse', required=False, default=None, type=str, + help='RSE to look at') +def invalidate_datasets(filename,rse): + spark = get_spark_session(app_name='global_dataset_invalidation') + + #Read the containers to delete + filename = f'/user/dmtops/{filename}' + df_delete = spark.read.text(filename) + df_delete = df_delete.withColumnRenamed('value','DATASET') + + #Get the basic df + df_locks = get_df_rse_locks(spark) + df_replicas = get_df_rse_replicas(spark,rse) + df_contents = get_df_contents(spark).alias('co') + df_rules = get_df_dataset_level_rules(spark).alias('ru') + + #Get the content of the datasets to delete (content includes filename, dataset and container) + df_delete = df_delete.join(df_contents,df_delete.DATASET==df_contents.DATASET,how='inner').select(['co.*']).alias('de') + + #Replicas to declare as bad + df_delete = df_delete.join(df_replicas,df_delete.FILENAME==df_replicas.NAME,how='inner').select(['de.*','RSE','REPLICA_STATE']).alias('de') + + #Rules protecting the replicas + df_delete = df_delete.join(df_locks,(df_delete.FILENAME==df_locks.NAME) & (df_delete.RSE == df_locks.RSE),how='left')\ + .withColumnRenamed('RULE_ID','LOCK_RULE_ID').select(['de.*','LOCK_RULE_ID']) + + #Rules protecting the datasets or children files + df_delete = df_delete.join(df_rules, df_delete.LOCK_RULE_ID == df_rules.ID, how='left')\ + .withColumnRenamed('ID','RULE_ID').select(['de.*', 'RULE_ID']).alias('de') + df_delete.cache() + + #Files to invalidate on DBS + df_delete.select('FILENAME').distinct().toPandas().to_csv('/input/dbs_files_inv.txt',index=False, header = False) + + #Replicas to invalidate on Rucio + windowSpec = Window.partitionBy('FILENAME') + df_delete.withColumn("RSES", collect_list(col("RSE")).over(windowSpec)) \ + .select(['FILENAME','RSES']).withColumn("RSES", concat_ws(";", "RSES")).distinct().toPandas().to_csv('/input/rucio_replicas_inv.csv',index=False) + + #RSE is exported in case it's tape and require purge_replicas + df_delete.filter(col('RULE_ID').isNotNull()).select(['RULE_ID','RSE']).distinct()\ + .toPandas().to_csv('/input/rucio_rules_delete.csv',index=False) + +if __name__ == "__main__": + invalidate_datasets() \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/file_invalidation_spark.py b/DMOps/file_invalidation_tool/src/file_invalidation_spark.py new file mode 100644 index 00000000..d75a4627 --- /dev/null +++ b/DMOps/file_invalidation_tool/src/file_invalidation_spark.py @@ -0,0 +1,70 @@ +""" +File : file_invalidation_spark.py +Author : Andres Manrique +Description : Receive a list of files desired to be invalidated in Rucio and DBS. + Generate files containing the replicas to be declared as bad and the rules that are file levelly protected +Generated files: rucio_replicas_inv.csv - List of replicas to be declared as bad in Rucio + "FILENAME": The name of the file to be declared as bad + "RSE": Site where the file is stored + rucio_rules_stuck.txt - List of rules that are suspended in Rucio and required to be re-evaluated + "RULE_ID": The ID of the rule to be re-evaluated + rucio_rules_delete.csv - List of file level rules to be deleted from Rucio + "RULE_ID": The ID of the rule to be deleted from Rucio +""" +import click as click +from CMSSpark.spark_utils import get_spark_session +from hadoop_queries import get_df_rse_locks, get_df_rse_replicas, get_df_contents, get_df_rules +from pyspark.sql.functions import col, collect_list, concat_ws +from pyspark.sql.window import Window + +@click.command() +@click.option('--filename', required=True, default=None, type=str, + help='Name of the text file having the datasets names') +@click.option('--rse', required=False, default=None, type=str, + help='RSE to look at') +def invalidate_files(filename, rse): + """ + Using Spark, and exports files containing the replicas to be declared as bad and the rules that are file levelly protected + """ + spark = get_spark_session(app_name='global_file_invalidation') + + #Read the files to delete + df_delete = spark.read.text(filename) + df_delete = df_delete.withColumnRenamed('value','FILENAME') + + #Get the basic df + df_locks = get_df_rse_locks(spark) + df_replicas = get_df_rse_replicas(spark,rse) + df_contents = get_df_contents(spark).alias('co') + df_rules = get_df_rules(spark).alias('ru') + + #Get the content of the containers to delete (content includes filename, dataset and container) + df_delete = df_delete.join(df_contents,df_delete.FILENAME==df_contents.FILENAME,how='inner').select(['co.*']).alias('de') + + #Replicas to declare as bad + df_delete = df_delete.join(df_replicas,df_delete.FILENAME==df_replicas.NAME,how='left').select(['de.*','RSE','REPLICA_STATE']).alias('de') + + windowSpec = Window.partitionBy('FILENAME') + df_delete.withColumn("RSES", collect_list(col("RSE")).over(windowSpec)) \ + .select(['FILENAME','RSES']).withColumn("RSES", concat_ws(";", "RSES")).distinct().toPandas().to_csv('/input/rucio_replicas_inv.csv',index=False) + + # Attach the replicas to the rules + df_delete = df_delete.join(df_locks,(df_delete.FILENAME==df_locks.NAME) & (df_delete.RSE == df_locks.RSE),how='left')\ + .withColumnRenamed('RULE_ID','LOCK_RULE_ID').select(['de.*','LOCK_RULE_ID']).alias('de') + df_rules = df_delete.join(df_rules, df_delete.LOCK_RULE_ID == df_rules.ID, how='left')\ + .withColumnRenamed('ID','RULE_ID')\ + .select(['de.*', 'RULE_ID', 'RULE_STATE', 'DID_TYPE']).alias('de') + df_rules.cache() + + #Rules protecting the replicas at File level + df_rules_delete = df_rules.filter(col('DID_TYPE') == 'F') + #RSE is exported in case it's tape and require purge_replicas + df_rules_delete.select(['RULE_ID','RSE']).distinct().toPandas().to_csv('/input/rucio_rules_delete.csv',index=False) + + #Rules that are suspended and require to be re-evaluated + df_rules_suspended = df_rules.filter(col('RULE_STATE') == 'S') + df_rules_suspended.select('RULE_ID').distinct().toPandas().to_csv('/input/rucio_rules_stuck.txt',index=False, header = False) + + +if __name__ == "__main__": + invalidate_files() \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/hadoop_queries.py b/DMOps/file_invalidation_tool/src/hadoop_queries.py new file mode 100644 index 00000000..d0d57aca --- /dev/null +++ b/DMOps/file_invalidation_tool/src/hadoop_queries.py @@ -0,0 +1,87 @@ +from datetime import datetime, timedelta +from pyspark.sql.functions import ( + col, lower, from_unixtime, + hex as _hex +) + +#Hadoop Queries Functions +def get_df_rse_locks(spark): + TODAY = datetime.today().strftime('%Y-%m-%d') if datetime.now().hour >= 6 else (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') + + HDFS_RUCIO_RSES = "/project/awg/cms/rucio/{}/rses/part*.avro".format(TODAY) + HDFS_RUCIO_LOCKS = "/project/awg/cms/rucio/{}/locks/part*.avro".format(TODAY) + + df_rses = spark.read.format('avro').load(HDFS_RUCIO_RSES)\ + .withColumn('ID', lower(_hex(col('ID'))))\ + .select(['ID','RSE']) + + df_locks = spark.read.format('avro').load(HDFS_RUCIO_LOCKS)\ + .withColumn('RULE_ID', lower(_hex(col('RULE_ID'))))\ + .withColumn('RSE_ID', lower(_hex(col('RSE_ID'))))\ + .select(['NAME','RSE_ID','RULE_ID','STATE','BYTES']) + return df_locks.join(df_rses, df_locks.RSE_ID==df_rses.ID, how='inner').withColumnRenamed('STATE','LOCK_STATE') + +def get_df_rse_replicas(spark,rse=None): + TODAY = datetime.today().strftime('%Y-%m-%d') if datetime.now().hour >= 6 else (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') + + HDFS_RUCIO_RSES = "/project/awg/cms/rucio/{}/rses/part*.avro".format(TODAY) + HDFS_RUCIO_REPLICAS = "/project/awg/cms/rucio/{}/replicas/part*.avro".format(TODAY) + + df_rses = None + if rse is None: + df_rses = spark.read.format('avro').load(HDFS_RUCIO_RSES)\ + .withColumn('ID', lower(_hex(col('ID'))))\ + .select(['ID','RSE']) + else: + df_rses = spark.read.format('avro').load(HDFS_RUCIO_RSES)\ + .withColumn('ID', lower(_hex(col('ID'))))\ + .filter(col('RSE')==rse)\ + .select(['ID','RSE']) + + df_replicas = spark.read.format('avro').load(HDFS_RUCIO_REPLICAS)\ + .withColumn('RSE_ID', lower(_hex(col('RSE_ID'))))\ + .withColumn('TOMBSTONE',from_unixtime(col('TOMBSTONE')/1000,'yyyy-MM-dd'))\ + .select(['RSE_ID', 'NAME', 'BYTES','STATE','TOMBSTONE']).withColumnRenamed('STATE','REPLICA_STATE') + return df_replicas.join(df_rses, df_replicas.RSE_ID==df_rses.ID, how='inner').select(['RSE_ID', 'NAME', 'BYTES','REPLICA_STATE','TOMBSTONE','RSE']).alias('r') + +def get_df_contents(spark): + TODAY = datetime.today().strftime('%Y-%m-%d') if datetime.now().hour >= 6 else (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') + + HDFS_RUCIO_CONTENTS = "/project/awg/cms/rucio/{}/contents/part*.avro".format(TODAY) + + df_contents = spark.read.format('avro').load(HDFS_RUCIO_CONTENTS).select(['NAME','CHILD_NAME','DID_TYPE','CHILD_TYPE']) + df_containers = df_contents.filter(col('DID_TYPE')=='C').filter(col('CHILD_TYPE')=='D').select(['NAME','CHILD_NAME'])\ + .withColumnRenamed('NAME','CONTAINER').withColumnRenamed('CHILD_NAME','DATASET').alias('c') + df_datasets = df_contents.filter(col('DID_TYPE')=='D').filter(col('CHILD_TYPE')=='F').withColumnRenamed('CHILD_NAME','FILENAME').select(['FILENAME','NAME']) + return df_containers.join(df_datasets,df_containers.DATASET == df_datasets.NAME,how='left')\ + .select(['CONTAINER','DATASET','FILENAME']) + +def get_df_dataset_level_rules(spark): + TODAY = datetime.today().strftime('%Y-%m-%d') if datetime.now().hour >= 6 else (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') + + HDFS_RUCIO_RULES = "/project/awg/cms/rucio/{}/rules/part*.avro".format(TODAY) + #Get the rules for datasets and files + df_rules = spark.read.format('avro').load(HDFS_RUCIO_RULES).withColumn('ID', lower(_hex(col('ID'))))\ + .filter((col('DID_TYPE')=='D') | (col('DID_TYPE')=='F')).select(['ID','NAME']) + df_rules.cache() + return df_rules + +def get_df_rules(spark): + TODAY = datetime.today().strftime('%Y-%m-%d') if datetime.now().hour >= 6 else (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') + + HDFS_RUCIO_RULES = "/project/awg/cms/rucio/{}/rules/part*.avro".format(TODAY) + #Get the rules for datasets and files + df_rules = spark.read.format('avro').load(HDFS_RUCIO_RULES).withColumn('ID', lower(_hex(col('ID'))))\ + .withColumnRenamed('STATE','RULE_STATE').select(['ID','NAME','RULE_STATE', 'DID_TYPE']) + df_rules.cache() + return df_rules + +def get_df_file_level_rules(spark): + TODAY = datetime.today().strftime('%Y-%m-%d') if datetime.now().hour >= 6 else (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') + + HDFS_RUCIO_RULES = "/project/awg/cms/rucio/{}/rules/part*.avro".format(TODAY) + #Get the rules for datasets and files + df_rules = spark.read.format('avro').load(HDFS_RUCIO_RULES).withColumn('ID', lower(_hex(col('ID'))))\ + .filter(col('DID_TYPE')=='F').select(['ID','NAME']) + df_rules.cache() + return df_rules \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/integrity_validation.py b/DMOps/file_invalidation_tool/src/integrity_validation.py new file mode 100644 index 00000000..fef17410 --- /dev/null +++ b/DMOps/file_invalidation_tool/src/integrity_validation.py @@ -0,0 +1,112 @@ +import os +import gfal2 +import logging +import argparse +import contextlib +import pandas as pd +from pathlib import Path +from rucio.client import Client + +ctx = gfal2.creat_context() + +@contextlib.contextmanager +def suppress_logs(logger_name): + logger = logging.getLogger(logger_name) + current_level = logger.level + logger.setLevel(logging.ERROR) + try: + yield + finally: + logger.setLevel(current_level) + +def validate_file_exists(file_path): + """ + Validate if a file exists at the given file path. + + Args: + file_path (str): The path to the file. + + Returns: + str: The valid file path. + + Raises: + argparse.ArgumentTypeError: If the file does not exist. + + """ + if not os.path.exists(file_path): + raise argparse.ArgumentTypeError(f"The file '{file_path}' does not exist.") + return file_path + +def validate_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument('files', type=validate_file_exists, help='File containing the replicas of files to verify checksum. Format CSV: FILENAME,RSE') + parser.add_argument('--dry-run', action='store_true', help='Test the script without invalidating anything') + + args = parser.parse_args() + dids_df = pd.read_csv(args.files) + if len(dids_df) == 0: + raise ValueError("Files list can't be empty") + if not ['FILENAME', 'RSE_EXPRESSION'].issubset(dids_df.columns): + raise ValueError('File must contain columns FILENAME and RSE_EXPRESSION') + + return dids_df, args.dry_run + +def check_file_integrity(pfn: str, adler: str) -> bool: + filename = Path(pfn).name + local_path = Path("/data") / filename + file_uri = f"file://{local_path}" + + logging.info(f'Copying file {pfn} to local directory...') + + try: + with suppress_logs('gfal2'): + ctx.filecopy(pfn, file_uri) + integrity = ctx.checksum(file_uri, "adler32") == adler + return integrity + except Exception as e: + logging.error(f"Error during file integrity check: {e}") + if '404' in str(e): + # declare replica as bad + return False + else: + # Keep replica for manual evaluation + return True + finally: + try: + os.remove(local_path) + except OSError as e: + logging.warning(f"Failed to remove temporary file {local_path}: {e}") + +def checksum_invalidate_dids(dids_df, dry_run=False): + client = Client() + invalidate = [] + for filename, rse_expression in dids_df.values: + for replica in client.list_replicas([{'scope':'cms','name':filename}],rse_expression=rse_expression): + adler = replica['adler32'] + for rse in replica['rses'].keys(): + if replica['states'][rse] == 'AVAILABLE': + for pfn in replica['rses'][rse]: + try: + integrity = check_file_integrity(pfn,adler) + if integrity: + logging.info("File: %s is valid at RSE: %s" % (filename,rse)) + else: + invalidate.append({'scope': 'cms', 'name': filename, 'rse': rse}) + break + except Exception as ex: + logging.error("Unable to validate file: %s at RSE: %s" % (filename,rse)) + logging.error(ex) + + if len(invalidate) > 0 and not dry_run: + client.declare_bad_file_replicas(invalidate,reason='FIT: Checksum mismatch', force=True) + for record in invalidate: + logging.info("Declared file %s as bad at %s" % (record['name'], record['rse'])) + elif dry_run: + for record in invalidate: + logging.info("Would declare file %s as bad at %s" % (record['name'], record['rse'])) + else: + logging.info("No files were declared as bad") + +if __name__ == '__main__': + dids_df, dry_run = validate_arguments() + checksum_invalidate_dids(dids_df, dry_run) \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/invalidate_dbs.py b/DMOps/file_invalidation_tool/src/invalidate_dbs.py new file mode 100644 index 00000000..84d8d670 --- /dev/null +++ b/DMOps/file_invalidation_tool/src/invalidate_dbs.py @@ -0,0 +1,97 @@ +import argparse +import logging + +from dbs.apis.dbsClient import DbsApi +from dbs.exceptions.dbsClientException import dbsClientException + +TEST_url="https://cmsweb-testbed.cern.ch/dbs/int/global/DBSWriter" +url="https://cmsweb.cern.ch/dbs/prod/global/DBSWriter" + +# 'TEST_URL' or just 'url' + +def validate_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument('filename', type=str, help='Name of the file containing the list of DIDs') + parser.add_argument('--datasets', type=str, help='Name of the file that contains the list of datasets to invalidate') + parser.add_argument('--test', action='store_true', help='Use test DBS instance') + + args = parser.parse_args() + files = [] + datasets = [] + + if not args.filename: + raise ValueError('Filename is required.') + else: + filename = args.filename + with open(filename, 'r') as f: + files = f.readlines() + files = [x.strip().replace('\n','') for x in files] + if len(files) == 0: + raise ValueError("Files list can't be empty") + + if args.datasets: + with open(args.datasets, 'r') as f: + datasets = f.readlines() + datasets = [x.strip().replace('\n','') for x in datasets] + + test = args.test + return files, datasets, test + +def invalidate_files(files, dbsApi=None, test=False): + consecutiveErrors = 0 + if not dbsApi: + dbsApi = DbsApi(url=TEST_url) if test else DbsApi(url=url) + + for lfn in files: + if not test: + try: + result = dbsApi.updateFileStatus(logical_file_name=lfn, is_file_valid=0, lost=0) + if result == []: + consecutiveErrors = 0 + logging.info("Invalidation OK for file: %s" % lfn) + else: + consecutiveErrors += 1 + logging.error("Invalidation FAILED for file: %s" % lfn) + logging.error(result) + except Exception as ex: + consecutiveErrors += 1 + logging.error("Invalidation FAILED for file: %s" % lfn) + logging.error(ex) + + if consecutiveErrors >= 5 or consecutiveErrors == len(files): + raise Exception("Too many consecutive errors, check you have the right permissions to invalidate files on DBS or try again later.") + else: + logging.info("Would invalidate file on DBS: %s" % lfn) + + + +def invalidate_datasets(files, datasets, test): + dbsApi = DbsApi(url=TEST_url) if test else DbsApi(url=url) + + invalidate_files(files, dbsApi, test) + + for dataset in datasets: + if not test: + try: + dataset_invalidation = dbsApi.updateDatasetType(dataset=dataset, dataset_access_type="INVALID") + if dataset_invalidation == []: + logging.info("Invalidation OK for dataset: %s" % dataset) + else: + logging.error("Invalidation FAILED for dataset: %s" % dataset) + logging.error(dataset_invalidation) + except Exception as ex: + logging.error("Invalidation FAILED for dataset: %s" % dataset) + logging.error(ex) + else: + logging.info("Would invalidate dataset on DBS: %s" % dataset) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format='%(levelname)s %(asctime)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S') + params = validate_arguments() + + files = params[0] + datasets = params[1] + test = params[2] + + invalidate_datasets(files, datasets, test) \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/invalidate_rucio.py b/DMOps/file_invalidation_tool/src/invalidate_rucio.py new file mode 100644 index 00000000..0771a681 --- /dev/null +++ b/DMOps/file_invalidation_tool/src/invalidate_rucio.py @@ -0,0 +1,292 @@ +""" +File : invalidate_rucio.py +Author : Andres Manrique +Description : Declare replicas as bad to force declaring them as lost (Rucio invalidation for invalid DBS files which are valid on Rucio) + Erase empty datasets and containers from Rucio + Delete rules of empty datasets or empty containers. + Stuck suspended rules of invalid DBS files that are valid on Rucio (force rule re-evaluation) + +""" + +import asyncio +import os +import logging +import pandas as pd +import argparse +from rucio.client import Client + +logging.basicConfig(level=logging.INFO, format='%(levelname)s %(asctime)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S') + +def validate_file_exists(file_path): + """ + Validate if a file exists at the given file path. + + Args: + file_path (str): The path to the file. + + Returns: + str: The valid file path. + + Raises: + argparse.ArgumentTypeError: If the file does not exist. + + """ + if not os.path.exists(file_path): + raise argparse.ArgumentTypeError(f"The file '{file_path}' does not exist.") + return file_path + +def validate_arguments(): + """ + Validate the command line arguments and parse them to obtain the required files and parameters. + + Parameters: + None + + Returns: + replicas (pandas.DataFrame): A DataFrame containing the replicas data. + datasets (list): A list of datasets to be deleted. + containers (list): A list of containers to be deleted. + rules (list): A list of rules to be deleted. + dry_run (bool): A flag indicating if the script should be run in dry-run mode. + + Raises: + ValueError: If the datasets list is empty when attempting to delete containers. + """ + + parser = argparse.ArgumentParser() + parser.add_argument('replicas', type=validate_file_exists, help='File containing the replicas of files invalid on DBS and valid on Rucio. File with CSVformat: FILENAME,RSE', + metavar='REPLICAS_FILE') + parser.add_argument('--datasets', type=validate_file_exists, help='File containing the list of empty datasets (no files associated to them)') + parser.add_argument('--containers', type=validate_file_exists, help='File containing the list of empty containers (no files associated to them)') + parser.add_argument('--rules-delete', type=validate_file_exists, help='Rules associated to the empty datasets, empty containers or lost files. Avoid [0/0/0] rules') + parser.add_argument('--rules-stuck', type=validate_file_exists, help='Suspended Rules associated with lost files that requires to be re-evaluated') #Used for rules with lost files, but all the rule files + parser.add_argument('--dry-run', action='store_true', help='Test the script without deleting anything') + parser.add_argument('--erase-mode', action='store_true', help='Erase empty datasets and containers') + parser.add_argument('--reason', type=str, help='Comment for the deletion') + + args = parser.parse_args() + replicas_df = pd.DataFrame() + datasets = [] + containers = [] + rules_delete_df = pd.DataFrame() + rules_stuck = [] + + #Replicas to declare as bad + if args.replicas: + replicas_df = pd.read_csv(args.replicas) + if not set(['FILENAME', 'RSES']).issubset(replicas_df.columns): + raise ValueError('File csv file must have the following columns: FILENAME, RSES') + + #Check the reason exists + if args.reason is None or len(args.reason) == 0: + raise ValueError("Reason for deletion is required.") + + if args.erase_mode: + #Empty to datasets to erase from Rucio. They are deleted to avoid create rules [0/0/0] + if args.datasets: + with open(args.datasets,'r') as f: + datasets = f.readlines() + datasets = [x.strip() for x in datasets] + + #Empty to containers to erase from Rucio. It shouldn't be possible to delete containers without datasets (prevent ghosts). They are deleted to avoid create rules [0/0/0] + if args.containers: + with open(args.containers,'r') as f: + containers = f.readlines() + containers = [x.strip() for x in containers] + if len(datasets) == 0 and len(containers) > 0: + raise ValueError('Datasets list cannot be empty deleting containers') + + #Check if the file with the rules to delete exists. If empty nothing will happen + #Empty rules are deleted to avoid empty rules [0/0/0] + if args.rules_delete: + rules_delete_df = pd.read_csv(args.rules_delete) + if not set(['RULE_ID','RSE']).issubset(rules_delete_df.columns): + raise ValueError('File csv file must have the following columns: RULE_ID,RSE') + + #Check if the file with the rules to stuck exists. If empty nothing will happen + if args.rules_stuck: + with open(args.rules_stuck,'r') as f: + rules_stuck = f.readlines() + rules_stuck = [x.strip() for x in rules_stuck] + + return replicas_df, datasets, containers, rules_delete_df, rules_stuck, args.reason,args.dry_run + +async def replicas_worker(queue, reason, dry_run): + """ + Asynchronously processes files from a queue and declares them as bad if necessary. + + Args: + queue (asyncio.Queue): The queue containing files to be processed. + dry_run (bool): Whether to perform a dry run or not. + + Returns: + None + """ + client = Client() + + while True: + record = await queue.get() + file = record[0] + rses = record[1].split(';') + dids = [] + for rse in rses: + dids.append({'scope':'cms', 'name':file, 'rse': rse}) + try: + if not dry_run: + # Force in case replicas was already declared as bad in the past + client.declare_bad_file_replicas(dids, reason=reason, force=True) + logging.info("Declared file %s as bad at %s" % (file, rses)) + else: + logging.info("Would declare file %s as bad at %s" % (file, rses)) + except Exception as e: + logging.error("Error declaring file %s as bad at %s: %s" % (file, rses, e)) + queue.task_done() + +async def stuck_rules_worker(queue, reason, dry_run): + """ + Asynchronously unlocks and stuck rules from a queue. This is used for the suspended rules + + Args: + queue (asyncio.Queue): The queue containing the stuck rules. + dry_run (bool): Whether to perform a dry run or not. + + Returns: + None + """ + + client = Client() + while True: + rule = await queue.get() + try: + if not dry_run: + client.update_replication_rule(rule, {'state':'STUCK', 'comment': reason}) + logging.info("Re-stuck Rule %s" % (rule)) + else: + logging.info("Would re-stuck Rule %s" % (rule)) + except Exception as e: + logging.error("Error set rule to stuck %s: %s" % (rule, e)) + queue.task_done() + +async def delete_rules_worker(queue, reason, dry_run): + """ + Asynchronously deletes replication rules from the queue. + + Args: + queue (asyncio.Queue): The queue containing the replication rules to be deleted. + dry_run (bool): If True, the rules will not actually be deleted, but the deletion will be logged. + + Returns: + None + """ + + client = Client() + while True: + record = await queue.get() + rule = record[0] + rse_expression = record[1] + try: + if not dry_run: + client.update_replication_rule(rule_id=rule, options={'locked': False, 'comment': reason}) + purge_replicas = 'Tape' in rse_expression + client.delete_replication_rule(rule_id=rule,purge_replicas=purge_replicas) + logging.info("Deleted rule %s" % (rule)) + else: + logging.info("Would Delete rule %s" % (rule)) + except Exception as e: + logging.error("Error updating rule %s: %s" % (rule, e)) + queue.task_done() + +async def erase_dids_worker(queue, dry_run): + """ + Erases emptyDIDs from the queue. + + Args: + queue (asyncio.Queue): The queue containing the DIDs to be erased. + dry_run (bool): If True, the DIDs will not be erased. Instead, information about the DIDs that would be deleted is logged. + + Returns: + None + """ + client = Client() + while True: + did = await queue.get() + try: + if not dry_run: + #Equivalent to rucio erase + client.set_metadata(scope='cms', name=did, key='lifetime', value=86400) + logging.info("Deleted did %s" % (did)) + else: + logging.info("Would delete did %s" % (did)) + except Exception as e: + logging.error("Error deleting did %s: %s" % (did, e)) + queue.task_done() + +async def main(replicas_df,reason,rules_stuck=[], rules_delete=pd.DataFrame(),datasets=[],containers=[],dry_run=False): + """ + Asynchronously executes tasks to process replicas, datasets, and containers. + + :param replicas_df: The DataFrame containing replica information. + :param datasets: A list of datasets to process. + :param containers: A list of containers to process. + :param rules_delete: A list of rules to delete. + :param rules_stuck: A list of stuck rules. + :param dry_run: A flag indicating whether to perform a dry run. + :return: None + """ + loop = asyncio.get_event_loop() + + #Start delete rules worker + if len(rules_delete) > 0: + num_workers_rules = 10 + queue_rules = asyncio.Queue() + workers_rules_delete = [loop.create_task(delete_rules_worker(queue_rules, reason, dry_run)) for _ in range(num_workers_rules)] + for rule_id,rse_expression in rules_delete[['RULE_ID','RSE']].values: + await queue_rules.put((rule_id,rse_expression)) + + await queue_rules.join() + for worker_task in workers_rules_delete: + worker_task.cancel() + # Start bad replicas workers + if len(replicas_df) > 0: + num_workers_replicas = 30 + queue_replicas = asyncio.Queue() + workers_files = [loop.create_task(replicas_worker(queue_replicas,reason, dry_run)) for _ in range(num_workers_replicas)] + for file, rse in replicas_df[['FILENAME','RSES']].values: + await queue_replicas.put((file, rse)) + + await queue_replicas.join() + for worker_task in workers_files: + worker_task.cancel() + + + #Start delete dids worker + if len(containers) > 0 or len(datasets) > 0: + queue_dids = asyncio.Queue() + num_workers_dids = 20 + workers_dids = [loop.create_task(erase_dids_worker(queue_dids,dry_run)) for _ in range(num_workers_dids)] + for dataset in datasets: + await queue_dids.put(dataset) + for container in containers: + await queue_dids.put(container) + + await queue_dids.join() + for worker_task in workers_dids: + worker_task.cancel() + + #Start stuck rules worker + if len(rules_stuck) > 0: + num_workers_stuck_rules = 10 + queue_stuck_rules = asyncio.Queue() + queue_rules_stuck = [loop.create_task(stuck_rules_worker(queue_stuck_rules, reason, dry_run)) for _ in range(num_workers_stuck_rules)] + for rule in rules_stuck: + await queue_stuck_rules.put(rule) + + await queue_stuck_rules.join() + for worker_task in queue_rules_stuck: + worker_task.cancel() + + +if __name__ == '__main__': + replicas_df, datasets, containers, rules_delete_df, rules_stuck, reason ,dry_run = validate_arguments() + loop = asyncio.get_event_loop() + loop.run_until_complete(main(replicas_df,reason,rules_delete_df, rules_stuck, datasets, containers,dry_run)) \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/run_invalidations.py b/DMOps/file_invalidation_tool/src/run_invalidations.py new file mode 100644 index 00000000..d55a5766 --- /dev/null +++ b/DMOps/file_invalidation_tool/src/run_invalidations.py @@ -0,0 +1,233 @@ +import os +import logging +import argparse +import subprocess +import asyncio +from glob import glob +from enum import Enum +import pandas as pd +from invalidate_dbs import invalidate_datasets as invalidate_dbs_datasets +from invalidate_dbs import invalidate_files as invalidate_dbs_files +from integrity_validation import checksum_invalidate_dids +from invalidate_rucio import main as invalidate_rucio + +logging.basicConfig(level=logging.INFO, format='%(levelname)s %(asctime)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S') + +class RunningMode(Enum): + GLOBAL = 'global' + ONLY_DBS = 'only-dbs' + ONLY_RUCIO = 'only-rucio' + SITE_INVALIDATION = 'site-invalidation' + INTEGRITY_VALIDATION = 'integrity-validation' +def check_arguments(): + """ + Parse and check the arguments provided, and perform various validations based on the running mode. + """ + parser = argparse.ArgumentParser() + parser.add_argument('running_mode', type=RunningMode, help='Running mode: global, only-dbs, only-rucio, site-invalidation, integrity-validation') + parser.add_argument('--user', type=str, help='Kerberos user') + parser.add_argument('--dry-run', action='store_true', help='Test the script without deleting anything') + parser.add_argument('--erase-mode', action='store_true', help='Erase empty datasets and containers') + parser.add_argument('--rse', type=str, help='Site name on which to perform the invalidations') + parser.add_argument('--reason', type=str, help='Comment for the deletion') + + args = parser.parse_args() + + if args.running_mode != RunningMode.INTEGRITY_VALIDATION: + if not args.user or not args.user.strip(): + raise ValueError("The 'user' argument cannot be null or empty.") + os.environ["USER"] = args.user + + if not args.running_mode == RunningMode.INTEGRITY_VALIDATION and not args.reason: + raise ValueError('Reason is required.') + + # Integrity invalidation mode + if args.running_mode == RunningMode.INTEGRITY_VALIDATION: + files = glob('/input/*.csv') + did_level = None + if len(files) != 1: + raise ValueError('Only one csv file is expected as /input/ parameter') + replicas_df = pd.read_csv(files[0]) + if not ('FILENAME' in replicas_df.columns and 'RSE_EXPRESSION' in replicas_df.columns): + raise ValueError('File must contain columns FILENAME and RSE_EXPRESSION') + if len(replicas_df) == 0: + raise ValueError("Files list can't be empty") + for did in replicas_df['FILENAME'].values: + current_level = None + if '.' in did: + current_level = 'file' + elif '#' in did: + current_level = 'dataset' + else: + current_level = 'container' + if did_level is None: + did_level = current_level + if did_level != current_level: + raise ValueError('All DIDs must be only at file level') + if did_level != 'file': + raise ValueError('checksum-validation mode can only be used with file level DIDs') + if args.erase_mode: + raise ValueError('Erase mode is not allowed when using checksum-validation mode. Only [--reason,--dry-run] are allowed when using checksum-validation mode') + return args.running_mode,replicas_df, args.dry_run + + files = glob('/input/*.txt') + if len(files) != 1: + raise ValueError('Only one txt file is expected as /input/ parameter') + + dids = [] + with open(files[0],'r') as f: + dids = f.readlines() + dids = [did.strip().replace('\n','') for did in dids] + + did_level = None + for did in dids: + current_level = None + if '.' in did: + current_level = 'file' + elif '#' in did: + current_level = 'dataset' + else: + current_level = 'container' + if did_level is None: + did_level = current_level + if did_level != current_level: + raise ValueError('All DIDs must be in the same level') + + # Site invalidation mode + if args.running_mode == RunningMode.SITE_INVALIDATION: + if not args.rse: + raise ValueError('RSE is required when using site-invalidation mode') + if args.erase_mode: + raise ValueError('Erase mode is not allowed when using site-invalidation mode. Only [--reason,--rse,--dry-run] are allowed when using site-invalidation mode') + return args.running_mode, files[0].split('/')[-1],did_level, dids, args.rse, args.reason, args.dry_run + + # Global, DBS or Rucio invalidation mode + if args.rse: + raise ValueError('RSE is used only when using site-invalidation mode') + return args.running_mode,files[0].split('/')[-1], did_level, dids, args.reason,args.dry_run, args.erase_mode + +def init_proxy(): + """ + Validate the existence and names the two PEM files in the /certs/ directory and init proxy with them. + """ + #Validate certs + certs = glob('/certs/*.pem') + if len(certs) != 2: + raise ValueError('Only two pem files are expected') + certs = [file.split('/')[1] for file in certs] + if 'userkey.pem' in certs and 'usercert.pem' in certs: + raise ValueError('Only usercert.pem and userkey.pem are expected') + + subprocess.run(['chmod', '400', '/certs/usercert.pem'], check=True, capture_output=True, text=True) + subprocess.run(['chmod', '400', '/certs/userkey.pem'], check=True, capture_output=True, text=True) + try: + subprocess.run(['voms-proxy-init', '-voms', 'cms', '-rfc', '-valid', '192:00', '--cert', '/certs/usercert.pem', '--key', '/certs/userkey.pem'], check=True, capture_output=True, text=True) + except subprocess.CalledProcessError as e: + if "Created proxy" not in e.stdout: + logging.error(f"Command '{e.cmd}' returned non-zero exit status {e.returncode}.") + logging.error("Error message:", e.stderr) + raise ValueError(e.stderr) + +def submit_spark_list_generation_job(did_level, input_file,rse=None): + logging.info('Starting spark job') + if rse is None: + result = subprocess.run(['/src/submit_invalidation.sh', did_level, input_file], check=True, capture_output=True, text=True) + else: + result = subprocess.run(['/src/submit_invalidation.sh', did_level, input_file,'--rse' ,rse], check=True, capture_output=True, text=True) + if result.returncode != 0: + raise ValueError(result.stderr) + logging.info('Finished spark job') + logging.info('--------------------------------------------') + +def dbs_invalidation(did_level, dids, dry_run=False): + logging.info('Starting DBS invalidation') + if did_level == 'file': + invalidate_dbs_files(dids, test=dry_run) + else: + files = [] + with open('/input/dbs_files_inv.txt', 'r') as f: + files = f.readlines() + files = [x.strip().replace('\n', '') for x in files] + if did_level == 'dataset': + invalidate_dbs_files(files, test=dry_run) + else: + invalidate_dbs_datasets(files, dids, test=dry_run) + logging.info('Finished DBS invalidation') + logging.info('--------------------------------------------') + +def rucio_invalidation(did_level, dids, reason, dry_run=False, erase_mode=False): + logging.info('Starting Rucio invalidation') + replicas_df = pd.read_csv('/input/rucio_replicas_inv.csv') + datasets = [] + containers = [] + rules_delete_df = pd.read_csv('/input/rucio_rules_delete.csv') + rules_stuck = [] + + if did_level == 'file': + with open('/input/rucio_rules_stuck.txt', 'r') as f: + rules_stuck = f.readlines() + rules_stuck = [x.strip() for x in rules_stuck] + elif did_level == 'dataset': + datasets = dids + elif did_level == 'container': + containers = dids + with open('/input/datasets_inv.txt', 'r') as f: + datasets = f.readlines() + datasets = [x.strip() for x in datasets] + logging.info('Found %d replicas to invalidate' % len(replicas_df)) + logging.info('Found %d rules to delete' % len(rules_delete_df)) + logging.info('Found %d rules to stuck' % len(rules_stuck)) + if erase_mode: + logging.info('Found %d datasets to erase' % len(datasets)) + logging.info('Found %d containers to erase' % len(containers)) + + loop = asyncio.get_event_loop() + reason = "File Invalidation Tool - " + reason + if erase_mode: + loop.run_until_complete(invalidate_rucio(replicas_df, reason, rules_stuck, rules_delete_df, datasets, containers,dry_run)) + else: + loop.run_until_complete(invalidate_rucio(replicas_df, reason, rules_stuck, dry_run=dry_run)) + logging.info('Finished Rucio invalidation') + +if __name__ == '__main__': + args = check_arguments() + init_proxy() + # Checksum validation + if args[0] == RunningMode.INTEGRITY_VALIDATION: + _, dids_df, dry_run = args + checksum_invalidate_dids(dids_df, dry_run) + # Local invalidation + elif args[0] == RunningMode.SITE_INVALIDATION: + _, input_file, did_level, dids, rse, reason, dry_run = args + try: + submit_spark_list_generation_job(did_level, input_file, rse=rse) + rucio_invalidation(did_level, dids, reason, dry_run=dry_run) + except subprocess.CalledProcessError as e: + logging.error("Error running shell script:") + logging.error(e.stderr) + # Global, Rucio or DBS invalidation + else: + running_mode,input_file, did_level, dids, reason, dry_run, erase_mode = args + if running_mode == RunningMode.ONLY_DBS: + #Not necessary when the level is file + if did_level != 'file': + submit_spark_list_generation_job(did_level, input_file) + dbs_invalidation(did_level, dids, dry_run) + elif running_mode == RunningMode.ONLY_RUCIO: + try: + submit_spark_list_generation_job(did_level, input_file) + rucio_invalidation(did_level, dids, reason, dry_run=dry_run, erase_mode=erase_mode) + except subprocess.CalledProcessError as e: + logging.error("Error running shell script:") + logging.error(e.stderr) + else: + try: + submit_spark_list_generation_job(did_level, input_file) + dbs_invalidation(did_level, dids, dry_run=dry_run) + rucio_invalidation(did_level, dids, reason, dry_run=dry_run, erase_mode=erase_mode) + except subprocess.CalledProcessError as e: + logging.info("Error running shell script:") + logging.info(e.stderr) + except Exception as e: + logging.info("Error running invalidation script:") + logging.info(e) \ No newline at end of file diff --git a/DMOps/file_invalidation_tool/src/submit_invalidation.sh b/DMOps/file_invalidation_tool/src/submit_invalidation.sh new file mode 100755 index 00000000..39543fe8 --- /dev/null +++ b/DMOps/file_invalidation_tool/src/submit_invalidation.sh @@ -0,0 +1,103 @@ +#!/bin/bash +# Display usage instructions +display_usage() { + echo "Usage: $0 [--rse ]" + echo " submit_script: Specify the execution level ('container', 'dataset', or 'file')." + echo " filename: Name of the file to process." + echo " --rse : (Optional) Specify the RSE." +} + +# Ensure minimum number of arguments +if [ $# -lt 2 ]; then + display_usage + exit 1 +fi + +# Initialize variables +exec_script="" +filename="" +rse="" + +# Parse command-line arguments +while [[ $# -gt 0 ]]; do + case "$1" in + --rse) + rse="$2" + shift 2 + ;; + *) + if [ -z "$exec_script" ]; then + exec_script="$1" + elif [ -z "$filename" ]; then + filename="$1" + else + display_usage + exit 1 + fi + shift + ;; + esac +done + +# Validate required arguments +if [ -z "$exec_script" ] || [ -z "$filename" ]; then + display_usage + exit 1 +fi + +# Validate execution level +valid_options=("container" "dataset" "file") +if [[ ! " ${valid_options[@]} " =~ " ${exec_script} " ]]; then + echo "Error: 'submit_script' must be one of: container, dataset, or file." >&2 + exit 1 +fi + +# Set the Python script to execute +exec_script="${exec_script}_invalidation_spark.py" + +# Load utilities and set environment +set -e +script_dir="$(cd "$(dirname "$0")" && pwd)" +. /data/CMSSpark/bin/utils/common_utils.sh + +# Authenticate using Kerberos +kinit -kt "/secrets/${USER}.keytab" "${USER}" + +# Configure Hadoop +source hadoop-setconf.sh analytix 3.2 + +# Spark submit arguments +spark_submit_args=( + --master yarn + --conf spark.ui.showConsoleProgress=false + --conf spark.shuffle.useOldFetchProtocol=true + --conf spark.shuffle.service.enabled=true + --conf "spark.driver.bindAddress=0.0.0.0" + --conf spark.driver.host="$(hostname)" + --conf "spark.driver.port=${DRIVERPORT}" + --conf "spark.driver.blockManager.port=${BMPORT}" + --conf "spark.ui.port=${UIPORT}" + --driver-memory 32g + --num-executors 30 + --executor-memory 32g + --packages org.apache.spark:spark-avro_2.12:3.4.0 +) + +# Prepare input arguments for Python script +py_input_args=(--filename "$filename") +if [ -n "$rse" ]; then + py_input_args+=(--rse "$rse") +fi + +# Ensure HDFS is prepared +hdfs_file="hdfs://analytix/user/${USER}/${filename}" +if /usr/hdp/hadoop/bin/hdfs dfs -test -e "$hdfs_file"; then + /usr/hdp/hadoop/bin/hdfs dfs -rm "$hdfs_file" +fi +/usr/hdp/hadoop/bin/hdfs dfs -cp "file:///input/${filename}" "$hdfs_file" + +# Submit Spark job +spark-submit "${spark_submit_args[@]}" "$script_dir/$exec_script" "${py_input_args[@]}" 2>&1 + +# Clean up HDFS +/usr/hdp/hadoop/bin/hdfs dfs -rm "$hdfs_file" \ No newline at end of file