Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Precompute clustering #18

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Format:
AnnData object
obs: 'cell_type', 'batch'
var: 'hvg', 'hvg_score', 'feature_name'
obsm: 'X_pca'
obsm: 'X_pca', 'clustering'
obsp: 'knn_distances', 'knn_connectivities'
layers: 'counts', 'normalized'
uns: 'dataset_id', 'dataset_name', 'dataset_url', 'dataset_reference', 'dataset_summary', 'dataset_description', 'dataset_organism', 'normalization_id', 'knn'
Expand All @@ -229,6 +229,7 @@ Data structure:
| `var["hvg_score"]` | `double` | A ranking of the features by hvg. |
| `var["feature_name"]` | `string` | A human-readable name for the feature, usually a gene symbol. |
| `obsm["X_pca"]` | `double` | The resulting PCA embedding. |
| `obsm["clustering"]` | `integer` | Leiden clustering results at different resolutions. |
| `obsp["knn_distances"]` | `double` | K nearest neighbors distance matrix. |
| `obsp["knn_connectivities"]` | `double` | K nearest neighbors connectivities matrix. |
| `layers["counts"]` | `integer` | Raw counts. |
Expand Down
10 changes: 7 additions & 3 deletions scripts/create_resources/test_resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ DATASET_DIR=resources_test/task_batch_integration
mkdir -p $DATASET_DIR

# process dataset
viash run src/data_processors/process_dataset/config.vsh.yaml -- \
nextflow run . \
-main-script target/nextflow/workflows/process_datasets/main.nf \
-profile docker \
--input "$RAW_DATA/cxg_immune_cell_atlas/dataset.h5ad" \
--output_dataset "$DATASET_DIR/cxg_immune_cell_atlas/dataset.h5ad" \
--output_solution "$DATASET_DIR/cxg_immune_cell_atlas/solution.h5ad"
--publish_dir "$DATASET_DIR/cxg_immune_cell_atlas" \
--output_dataset output_dataset.h5ad \
--output_solution output_solution.h5ad \
--output_state state.yaml

# run one method
viash run src/methods/combat/config.vsh.yaml -- \
Expand Down
4 changes: 4 additions & 0 deletions src/api/file_solution.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ info:
name: X_pca
description: The resulting PCA embedding.
required: true
- type: integer
name: clustering
description: Leiden clustering results at different resolutions.
required: true
obsp:
- type: double
name: knn_distances
Expand Down
30 changes: 30 additions & 0 deletions src/data_processors/precompute_clustering_merge/config.vsh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: precompute_clustering_merge
namespace: data_processors
label: Merge clustering precomputations
summary: Merge the precompute results of clustering on the input dataset
arguments:
- name: --input
type: file
direction: input
required: true
- name: --output
type: file
direction: output
required: true
- type: file
name: --clusterings
description: Clustering results to merge
direction: input
required: true
multiple: true
resources:
- type: python_script
path: script.py
engines:
- type: docker
image: openproblems/base_python:1.0.0
runners:
- type: executable
- type: nextflow
directives:
label: [midtime, midmem, lowcpu]
28 changes: 28 additions & 0 deletions src/data_processors/precompute_clustering_merge/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import anndata as ad
import pandas as pd

## VIASH START
par = {
"input": "resources_test/task_batch_integration/cxg_immune_cell_atlas/dataset.h5ad",
"clusterings": ["output.h5ad", "output2.h5ad"],
"output": "output3.h5ad",
}
## VIASH END

print("Read clusterings", flush=True)
clusterings = []
for clus_file in par["clusterings"]:
adata = ad.read_h5ad(clus_file)
obs_filt = adata.obs.filter(regex='leiden_r[0-9.]+')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would adjust the pattern as described in the clustering script

clusterings.append(obs_filt)

print("Merge clusterings", flush=True)
merged = pd.concat(clusterings, axis=1)

print("Read input", flush=True)
input = ad.read_h5ad(par["input"])

input.obsm["clustering"] = merged
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you don't store the the clusters under .obs. However, the metric will look in obs for those clusters, so I'd recommend to store them there


print("Store outputs", flush=True)
input.write_h5ad(par["output"], compression="gzip")
34 changes: 34 additions & 0 deletions src/data_processors/precompute_clustering_run/config.vsh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: precompute_clustering_run
namespace: data_processors
label: Run clustering precomputations
summary: Run clustering on the input dataset
arguments:
- name: --input
type: file
direction: input
required: true
- name: --output
type: file
direction: output
required: true
- type: double
name: resolution
default: 0.8
description: Resolution parameter for clustering
resources:
- type: python_script
path: script.py
engines:
- type: docker
image: openproblems/base_python:1.0.0
setup:
- type: python
pypi:
- scanpy
- igraph
- leidenalg
runners:
- type: executable
- type: nextflow
directives:
label: [midtime, midmem, lowcpu]
39 changes: 39 additions & 0 deletions src/data_processors/precompute_clustering_run/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import anndata as ad

# check if we can use GPU
USE_GPU = False
try:
import subprocess
assert subprocess.run('nvidia-smi', shell=True, stdout=subprocess.DEVNULL).returncode == 0
from rapids_singlecell.tl import leiden
USE_GPU = True
except Exception as e:
Comment on lines +5 to +10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to see that you implemented rapids_singlecell, however we found that the clustering results are quite different form the CPU implementation and often don't make sense. According to this thread, this bug could have been fixed recently. But for simplicity, I would go with the igraph implementation, which will be the default in scanpy in the future.

from scanpy.tl import leiden

## VIASH START
par = {
"input": "resources_test/task_batch_integration/cxg_immune_cell_atlas/dataset.h5ad",
"output": "output.h5ad",
"resolution": 0.8,
}
## VIASH END

n_cell_cpu = 300_000

print("Read input", flush=True)
input = ad.read_h5ad(par["input"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use the partial reading function to only load obs and obsp


key = f'leiden_r{par["resolution"]}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change the resolution pattern to fleiden_{par["resolution"]}, since that's what the optimal clustering function is expecting: https://github.com/theislab/scib/blob/d35727305501f905424f19a0e95b61dcda61c460/scib/metrics/clustering.py#L56-L58


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would set the CPU algorithm to igraph, as per future warning message from scanpy

Suggested change
kwargs = dict()
if not USE_GPU:
kwargs |= dict(
flavor='igraph',
n_iterations=2,
)

leiden(
input,
resolution=par["resolution"],
neighbors_key="knn",
key_added=key,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
key_added=key,
key_added=key,
**kwargs,

)

print("Store outputs", flush=True)
output = ad.AnnData(
obs=input.obs[[key]],
)
output.write_h5ad(par["output"], compression="gzip")
10 changes: 8 additions & 2 deletions src/data_processors/process_dataset/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"output": "output.h5ad"
}
meta = {
"config": "target/nextflow/batch_integration/process_dataset/.config.vsh.yaml",
"resources_dir": "src/common/helper_functions"
"config": "target/nextflow/data_processors/process_dataset/.config.vsh.yaml",
"resources_dir": "target/nextflow/data_processors/process_dataset"
}
## VIASH END

Expand Down Expand Up @@ -80,6 +80,12 @@ def compute_batched_hvg(adata, n_hvgs):
"variance_ratio": variance_ratio
}

print(">> Recompute neighbors", flush=True)
del adata.uns["knn"]
del adata.obsp["knn_connectivities"]
del adata.obsp["knn_distances"]
sc.pp.neighbors(adata, use_rep="X_pca", n_neighbors=30, key_added="knn")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely good to compute the neighbors as a precomputation step, since it's also used for other metrics. We might want to make n_neighbors configurable in the future


print(">> Create output object", flush=True)
output_dataset = subset_h5ad_by_format(
adata,
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/clustering_overlap/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ engines:
setup:
- type: python
pypi:
- scib==1.1.5
- scib==1.1.6
runners:
- type: executable
- type: nextflow
Expand Down
9 changes: 6 additions & 3 deletions src/metrics/clustering_overlap/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
par = {
'adata_integrated': 'resources_test/task_batch_integration/cxg_immune_cell_atlas/integrated_full.h5ad',
'output': 'output.h5ad',
"resolutions": [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8], # TODO needs to be added to config
}

meta = {
Expand All @@ -25,18 +26,20 @@
adata.uns |= read_anndata(par['input_solution'], uns='uns').uns

print('Run optimal Leiden clustering', flush=True)
cluster_key = "leiden"
cluster_optimal_resolution(
adata=adata,
label_key="cell_type",
cluster_key='cluster',
cluster_key=cluster_key,
cluster_function=sc.tl.leiden,
resolutions=par["resolutions"],
)

print('Compute ARI score', flush=True)
ari_score = ari(adata, cluster_key='cluster', label_key="cell_type")
ari_score = ari(adata, cluster_key=cluster_key, label_key="cell_type")

print('Compute NMI score', flush=True)
nmi_score = nmi(adata, cluster_key='cluster', label_key="cell_type")
nmi_score = nmi(adata, cluster_key=cluster_key, label_key="cell_type")

print("Create output AnnData object", flush=True)
output = ad.AnnData(
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/isolated_label_f1/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ engines:
setup:
- type: python
pypi:
- scib==1.1.5
- scib==1.1.6
runners:
- type: executable
- type: nextflow
Expand Down
5 changes: 4 additions & 1 deletion src/metrics/isolated_label_f1/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
par = {
'input_integrated': 'resources_test/task_batch_integration/cxg_immune_cell_atlas/integrated_full.h5ad',
'output': 'output.h5ad',
"resolutions": [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8], # TODO needs to be added to config
}

meta = {
Expand All @@ -26,7 +27,9 @@
score = isolated_labels_f1(
adata,
label_key="cell_type",
batch_key='batch',
batch_key="batch",
cluster_key="leiden",
resolutions=par["resolutions"],
embed=None,
iso_threshold=None,
verbose=True,
Expand Down
9 changes: 9 additions & 0 deletions src/workflows/process_datasets/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ argument_groups:
__merge__: /src/api/file_solution.yaml
required: true
direction: output
- name: Clustering
arguments:
- name: "--resolutions"
type: double
multiple: true
default: [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These resolutions must be kept consistent between precomputation and the metrics function. The default range from scib is

[0.1, 0.2, ..., 1.8, 1.9, 2.0]

description: Resolution parameter for clustering

resources:
- type: nextflow_script
Expand All @@ -29,6 +36,8 @@ dependencies:
- name: validation/check_dataset_with_schema
repository: openproblems
- name: data_processors/process_dataset
- name: data_processors/precompute_clustering_run
- name: data_processors/precompute_clustering_merge

runners:
- type: nextflow
46 changes: 46 additions & 0 deletions src/workflows/process_datasets/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,52 @@ workflow run_wf {
state.dataset != null
}

// precompute clustering of the input data at various resolutions
| flatMap { id, state ->
state.resolutions.collect { resolution ->
def newId = "${id}_r${resolution}"
def newState = state + [
"resolution": resolution,
"prevId": id
]
[newId, newState]
}
}

// precompute clustering at this resolution
| precompute_clustering_run.run(
fromState: ["input": "dataset", "resolution": "resolution"],
toState: ["output_clustering": "output"]
)

// group by original dataset id
| map{id, state ->
[state.prevId, state]
}
| groupTuple()

// merge the clustering results into one state
| map{ id, states ->
if (states.size() == 0) {
throw new RuntimeException("Expected at least one state, but got ${states.size()}")
}
if (states.size() != states[0].resolutions.size()) {
throw new RuntimeException("Expected ${states[0].resolutions.size()} states, but got ${states.size()}")
}

def clusterings = states.collect { it.output_clustering }
def newState = states[0] + ["clusterings": clusterings]

[id, newState]
}

// merge clustering results into dataset h5ad
| precompute_clustering_merge.run(
fromState: ["input": "dataset", "clusterings": "clusterings"],
toState: ["dataset": "output"]
)

// process the dataset
| process_dataset.run(
fromState: [ input: "dataset" ],
toState: [
Expand Down
Loading