Skip to content

Commit

Permalink
included S3 decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Oct 29, 2024
1 parent 7f5edf3 commit 021da81
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 258 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ _test_commandline.yaml
# sample workflow
means.txt
sample*

.idea
357 changes: 99 additions & 258 deletions cgatcore/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,202 +1,34 @@
'''pipeline.py - Tools for ruffus pipelines
===========================================
'''
pipeline.py - Tools for CGAT Ruffus Pipelines
=============================================
The :mod:`pipeline` module contains various utility functions for
interfacing CGAT ruffus pipelines with an HPC cluster, uploading data
to databases, providing parameterization, and more.
This module provides a comprehensive set of tools to facilitate the creation and management
of data processing pipelines using CGAT Ruffus. It includes functionalities for pipeline control,
logging, parameterization, task execution, database uploads, temporary file management, and
integration with AWS S3.
It is a collection of utility functions covering the topics:
**Features:**
* `pipeline control`_
* `Logging`_
* `Parameterisation`_
* `Running tasks`_
* `database upload`_
* `Report building`_
- **Pipeline Control:** Command-line interface for executing, showing, and managing pipeline tasks.
- **Logging:** Configures logging to files and RabbitMQ for real-time monitoring.
- **Parameterization:** Loads and manages configuration parameters from various files.
- **Task Execution:** Manages the execution of tasks, supporting both local and cluster environments.
- **Database Upload:** Utilities for uploading processed data to databases.
- **Temporary File Management:** Functions to handle temporary files and directories.
- **AWS S3 Integration:** Support for processing files stored in AWS S3.
See :doc:`pipelines/pipeline_template` for a pipeline illustrating the
use of this module. See :ref:`pipelineSettingUp` on how to set up a
pipeline.
**Example Usage:**
pipeline control
----------------
```python
from cgatcore import pipeline as P
@P.transform("input.txt", suffix(".txt"), ".processed.txt")
def process_data(infile, outfile):
# Processing logic here
pass
:mod:`pipeline` provides a :func:`main` function that provides command
line control to a pipeline. To use it, add::
import cgatcore.pipeline as P
# ...
def main(argv=None):
if argv is None:
argv = sys.argv
P.main(argv)
if __name__ == "__main__":
sys.exit(P.main(sys.argv))
to your pipeline script. Typing::
python my_pipeline.py --help
will provide the following output:
Usage:
usage: <pipeline> [OPTIONS] [CMD] [target]
Execute pipeline mapping.
Commands can be any of the following
make <target>
run all tasks required to build *target*
show <target>
show tasks required to build *target* without executing them
plot <target>
plot image (using inkscape) of pipeline state for *target*
debug <target> [args]
debug a method using the supplied arguments. The method <target>
in the pipeline is run without checking any dependencies.
config
write new configuration files pipeline.yml with default values
dump
write pipeline configuration to stdout
touch
touch files only, do not run
regenerate
regenerate the ruffus checkpoint file
check
check if requirements (external tool dependencies) are satisfied.
clone <source>
create a clone of a pipeline in <source> in the current
directory. The cloning process aims to use soft linking to files
(not directories) as much as possible. Time stamps are
preserved. Cloning is useful if a pipeline needs to be re-run from
a certain point but the original pipeline should be preserved.
Options:
--version show program's version number and exit
-h, --help show this help message and exit
--pipeline-action=PIPELINE_ACTION
action to take [default=none].
--pipeline-format=PIPELINE_FORMAT
pipeline format [default=svg].
-n, --dry-run perform a dry run (do not execute any shell commands)
[default=False].
-c CONFIG_FILE, --config-file=CONFIG_FILE
benchmark configuration file [default=pipeline.yml].
-f FORCE_RUN, --force-run=FORCE_RUN
force running the pipeline even if there are up-to-
date tasks. If option is 'all', all tasks will be
rerun. Otherwise, only the tasks given as arguments
will be rerun. [default=False].
-p MULTIPROCESS, --multiprocess=MULTIPROCESS
number of parallel processes to use on submit host
(different from number of jobs to use for cluster
jobs) [default=none].
-e, --exceptions echo exceptions immediately as they occur
[default=True].
-i, --terminate terminate immediately at the first exception
[default=none].
-d, --debug output debugging information on console, and not the
logfile [default=False].
-s VARIABLES_TO_SET, --set=VARIABLES_TO_SET
explicitely set paramater values [default=[]].
--input-glob=INPUT_GLOBS, --input-glob=INPUT_GLOBS
glob expression for input filenames. The exact format
is pipeline specific. If the pipeline expects only a
single input, `--input-glob=*.bam` will be sufficient.
If the pipeline expects multiple types of input, a
qualifier might need to be added, for example
`--input-glob=bam=*.bam` --input-glob=bed=*.bed.gz`.
Giving this option overrides the default of a pipeline
looking for input in the current directory or
specified the config file. [default=[]].
--checksums=RUFFUS_CHECKSUMS_LEVEL
set the level of ruffus checksums[default=0].
-t, --is-test this is a test run[default=False].
--engine=ENGINE engine to use.[default=local].
--always-mount force mounting of arvados keep [False]
--only-info only update meta information, do not run
[default=False].
--work-dir=WORK_DIR working directory. Will be created if it does not
exist [default=none].
--input-validation perform input validation before starting
[default=False].
pipeline logging configuration:
--pipeline-logfile=PIPELINE_LOGFILE
primary logging destination.[default=pipeline.log].
--shell-logfile=SHELL_LOGFILE
filename for shell debugging information. If it is not
an absolute path, the output will be written into the
current working directory. If unset, no logging will
be output. [default=none].
Script timing options:
--timeit=TIMEIT_FILE
store timeing information in file [none].
--timeit-name=TIMEIT_NAME
name in timing file for this class of jobs [all].
--timeit-header add header for timing information [none].
Common options:
--random-seed=RANDOM_SEED
random seed to initialize number generator with
[none].
-v LOGLEVEL, --verbose=LOGLEVEL
loglevel [1]. The higher, the more output.
--log-config-filename=LOG_CONFIG_FILENAME
Configuration file for logger [logging.yml].
--tracing=TRACING enable function tracing [none].
-? output short help (command line options only.
cluster options:
--no-cluster, --local
do no use cluster - run locally [False].
--cluster-priority=CLUSTER_PRIORITY
set job priority on cluster [none].
--cluster-queue=CLUSTER_QUEUE
set cluster queue [none].
--cluster-num-jobs=CLUSTER_NUM_JOBS
number of jobs to submit to the queue execute in
parallel [none].
--cluster-parallel=CLUSTER_PARALLEL_ENVIRONMENT
name of the parallel environment to use [none].
--cluster-options=CLUSTER_OPTIONS
additional options for cluster jobs, passed on to
queuing system [none].
--cluster-queue-manager=CLUSTER_QUEUE_MANAGER
cluster queuing system [sge].
--cluster-memory-resource=CLUSTER_MEMORY_RESOURCE
resource name to allocate memory with [none].
--cluster-memory-default=CLUSTER_MEMORY_DEFAULT
default amount of memory to allocate [unlimited].
Input/output options:
-I FILE, --stdin=FILE
file to read stdin from [default = stdin].
-L FILE, --log=FILE
file with logging information [default = stdout].
-E FILE, --error=FILE
file with error information [default = stderr].
-S FILE, --stdout=FILE
file where output is to go [default = stdout].
Documentation on using pipelines is at :ref:`getting_started-Examples`.
if __name__ == "__main__":
P.main()
Logging
-------
Expand Down Expand Up @@ -283,7 +115,7 @@ def main(argv=None):
several utility functions for conveniently uploading data. The :func:`load`
method uploads data in a tab-separated file::
@transform("*.tsv.gz", suffix(".tsv.gz"), ".load")
@P.transform("*.tsv.gz", suffix(".tsv.gz"), ".load")
def loadData(infile, outfile):
P.load(infile, outfile)
Expand Down Expand Up @@ -321,74 +153,83 @@ def loadData(infile, outfile):
'''
import os
# cgatcore/pipeline/__init__.py

# import submodules into namespace
# Import existing pipeline functionality
from cgatcore.pipeline.control import *
from cgatcore.pipeline.database import *
from cgatcore.pipeline.files import *
from cgatcore.pipeline.cluster import *
from cgatcore.pipeline.execution import *
from cgatcore.pipeline.utils import *
from cgatcore.pipeline.parameters import *
from cgatcore.pipeline.utils import *

# Import original Ruffus decorators
from ruffus import (
transform,
merge,
split,
originate,
follows
)

# Import S3-aware decorators and functions
from cgatcore.remote.file_handler import (
s3_transform,
s3_merge,
s3_split,
s3_originate,
s3_follows,
S3Mapper,
s3_aware
)

# Expose the S3Mapper instance if it's needed elsewhere
s3_mapper = S3Mapper()

# Add S3-related utility functions
def configure_s3(aws_access_key_id=None, aws_secret_access_key=None, region_name=None):
"""
Configure AWS credentials for S3 access.
If credentials are not provided, it will use the default AWS configuration.
"""
import boto3
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
s3_mapper.s3.S3 = session.resource('s3')

# You can add more S3-related utility functions here as needed

# Add any other pipeline-related imports or functionality here

# Include a version number for the pipeline module
__version__ = "0.1.0" # Update this as needed

# Add a docstring for the module
__doc__ = """
This module provides pipeline functionality for cgat-core, including support for AWS S3.
It includes both standard Ruffus decorators and S3-aware decorators. The S3-aware decorators
can be used to seamlessly work with both local and S3-based files in your pipelines.
Example usage:
from cgatcore import pipeline as P
# Using standard Ruffus decorator (works as before)
@P.transform("input.txt", suffix(".txt"), ".processed")
def process_local_file(infile, outfile):
# Your processing logic here
pass
# Using S3-aware decorator
@P.s3_transform("s3://my-bucket/input.txt", suffix(".txt"), ".processed")
def process_s3_file(infile, outfile):
# Your processing logic here
pass
# Configure S3 credentials if needed
P.configure_s3(aws_access_key_id="YOUR_KEY", aws_secret_access_key="YOUR_SECRET")
"""

# __all__ = [
# # backwards incompatibility
# "clone",
# "touch",
# "snip",
# # execution.py
# "run",
# "execute",
# "shellquote",
# "buildStatement",
# "submit",
# "joinStatements",
# "cluster_runnable",
# "run_pickled",
# # database.py
# "tablequote",
# "toTable",
# "build_load_statement",
# "load",
# "concatenateAndLoad",
# "mergeAndLoad",
# "connect",
# "createView",
# "getdatabaseName",
# "importFromIterator",
# # Utils.py
# "add_doc",
# "isTest",
# "getCallerLocals",
# "getCaller",
# # Control.py
# "main",
# "peekParameters",
# # Files.py
# "getTempFile",
# "getTempDir",
# "getTempFilename",
# "checkScripts",
# "checkExecutables",
# # Local.py
# "run_report",
# "publish_report",
# "publish_notebooks",
# "publish_tracks",
# "getProjectDirectories",
# "getpipelineName",
# "getProjectId",
# "getProjectName",
# "isCGAT",
# # Parameters.py
# "getParameters",
# "loadParameters",
# "matchParameter",
# "substituteParameters",
# "asList",
# "checkParameter",
# "isTrue",
# "configToDictionary",
# ]
Loading

0 comments on commit 021da81

Please sign in to comment.