Skip to content

Commit

Permalink
Add setup and performance docs
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Feb 4, 2024
1 parent 25d5d45 commit efb2f2d
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 31 deletions.
34 changes: 5 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ Unify data in your entire machine learning lifecycle with **Space**, a comprehen
- [Ground truth database of LabelStudio](notebooks/label_studio_tutorial.ipynb)
- [Transforms and materialized views: Segment Anything as example](notebooks/segment_anything_tutorial.ipynb)
- [Incrementally build embedding vector indexes](notebooks/incremental_embedding_index.ipynb)
- [Parallel ingestion from WebDataset](notebooks/webdataset_ingestion.ipynb)

## Space 101

- Space uses [Arrow](https://arrow.apache.org/docs/python/index.html) in the API surface, e.g., schema, filter, data IO.
- All file paths in Space are [relative](./docs/design.md#relative-paths); datasets are immediately usable after downloading or moving.
- Space stores data itself, or a reference of data, in Parquet files. The reference can be the address of a row in ArrayRecord file, or the path of a standalone file (limitted support, see `space.core.schema.types.files`).
- `space.TfFeatures` is a built-in field type providing serializers for nested dicts of numpy arrays, based on [TFDS FeaturesDict](https://www.tensorflow.org/datasets/api_docs/python/tfds/features/FeaturesDict).
- Please find more information in [the design page](docs/design.md).
- Please find more information in the [design](docs/design.md) and [performance](docs/performance.md) docs.

## Quick Start

- [Install](#install)
- [Cloud Storage](#cloud-storage)
- [Cluster setup](#cluster-setup)
- [Cluster Setup and Performance Tuning](#cluster-setup-and-performance-tuning)
- [Create and Load Datasets](#create-and-load-datasets)
- [Write and Read](#write-and-read)
- [Transform and Materialized Views](#transform-and-materialized-views)
Expand All @@ -63,33 +63,9 @@ cd python
pip install .[dev]
```

### Cloud Storage
### Cluster Setup and Performance Tuning

Optionally, setup [GCS FUSE](https://cloud.google.com/storage/docs/gcs-fuse) to use files on Google Cloud Storage (GCS) (or [S3](https://github.com/s3fs-fuse/s3fs-fuse), [Azure](https://github.com/Azure/azure-storage-fuse)):

```bash
gcsfuse <mybucket> "/path/to/<mybucket>"
```

Space has not yet implemented Cloud Storage file systems. FUSE is the current suggested approach.

### Cluster Setup

Optionally, setup a cluster to run Space operations distributedly. We support Ray clusters, on the Ray cluster head/worker nodes:
```bash
# Start a Ray head node (IP 123.45.67.89, for example).
# See https://docs.ray.io/en/latest/ray-core/starting-ray.html for details.
ray start --head --port=6379
```

Using [Cloud Storage + FUSE](#cloud-storage) is required in the distributed mode, because the Ray cluster and the client machine should operate on the same directory of files. Run `gcsfuse` on all machines and the mapped local directory paths **must be the same**.

Run the following code on the client machine to connect to the Ray cluster:
```py
import ray
# Connect to the Ray cluster.
ray.init(address="ray://123.45.67.89:10001")
```
See the [setup and performance doc](/docs/performance.md#ray-runner-setup).

### Create and Load Datasets

Expand Down
93 changes: 93 additions & 0 deletions docs/performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
## Cluster Setup and Performance Tuning

Data operations in Space can run distributedly in a Ray cluster. Ray nodes access the same Space dataset files via Cloud Storage or distributed file systems.

## Setup

### Cloud Storage

Setup [GCS FUSE](https://cloud.google.com/storage/docs/gcs-fuse) to use files on Google Cloud Storage (GCS) (or [S3](https://github.com/s3fs-fuse/s3fs-fuse), [Azure](https://github.com/Azure/azure-storage-fuse)):

```bash
gcsfuse <mybucket> "/path/to/<mybucket>"
```

Space has not yet implemented Cloud Storage file systems. FUSE is the current suggested approach.

### Cluster Setup

On the Ray cluster head/worker nodes:
```bash
# Start a Ray head node (IP 123.45.67.89, for example).
# See https://docs.ray.io/en/latest/ray-core/starting-ray.html for details.
ray start --head --port=6379
```

Using [Cloud Storage + FUSE](#cloud-storage) is required in the distributed mode, because the Ray cluster and the client machine should operate on the same directory of files. The mapped local directory paths **must be the same**.

Run the following code on the client machine to connect to the Ray cluster:
```py
import ray

# Connect to the Ray cluster.
ray.init(address="ray://123.45.67.89:10001")
```

## Configure Space Options

Create a Ray runner linking to a Space dataset or view to run operations in the Ray cluster. Use options to tune the performance.

### Data Ingestion

[The WebDataset ingestion example](/notebooks/webdataset_ingestion.ipynb) describes the setup in detail. The options to tune include:

- `max_parallelism`: ingestion workload will run in parallel on Ray nodes, capped by this parallelism

- `array_record_options`: set the [options of ArrayRecord lib](https://github.com/google/array_record/blob/2ac1d904f6be31e5aa2f09549774af65d84bff5a/cpp/array_record_writer.h#L83); Group size is the number of records to serialize together in one chunk. A lower value improves random access latency. However, a larger value is preferred on Cloud Storage, which performs better for batch read. A larger group size reduces the ArrayRecord file size.

```py
# `ds_or_view` is a Space dataset or (materialized) view.
runner = ds_or_view.ray(
ray_options=RayOptions(max_parallelism=4),
file_options=FileOptions(
array_record_options=ArrayRecordOptions(options="group_size:64")
))
```

### Data Read

Data read in Ray runner has the following steps:

- Obtain a list of index files to read, based on the filter and version. If a read `batch size` is provided, further split a file into row ranges. Each row range will be a [Ray data block](https://docs.ray.io/en/latest/data/api/doc/ray.data.block.Block.html).

- When reading a block, first read the index file as an Arrow table. If there are record fields, read these fields from ArrayRecord files.

The options to tune include:

- `max_parallelism`: Ray read parallelism, controlls `parallelism` of [Datasource.get_read_tasks](https://docs.ray.io/en/latest/data/api/doc/ray.data.Datasource.get_read_tasks.html#ray.data.Datasource.get_read_tasks)

- `batch_size`: a too small batch size will produce too many Ray blocks and have a negative performance impact. A large batch size will require reading many records from ArrayRecord files for each Ray block, which can be slow.

Examples of setting read batch size in different scenarios:

```py
ray_option = RayOptions(max_parallelism=4)

iterator = ds.ray(ray_option).read(batch_size=64)

mv.ray(ray_option).refresh(batch_size=64)

ray_ds = ds.ray_dataset(ray_option, ReadOptions(batch_size=64))
```

#### Read Data for Training

Users can choose to store data fields in **Parquet** or **ArrayRecord** files (record fields). Space performance is similar to other Parquet based datasets when all fields are in Parquet.

The ArrayRecord reader uses random access read at the granularity of `group size` records. Random access read performs well on local or high performance distributed file systems attached to the reader node (e.g., training VMs). However, the read performance degrades drastically on Cloud Storage, because of too many read RPCs (e.g., per record). The tips for **Cloud Storage** are:

- For quasi sequential read, use a larger `group size` when ingesting data to Space. It can effectively improve read throughput by reducing number of RPC. But it helps only when adjacent records are read together.

- For fully randomized read (the order to read records are shuffled), the Space dataset files should be first cached in a high performance file system.

Random access read training has the benefit of lightweight global shuffling, deterministic training, and checkpointing training state. See the [Grain](https://github.com/google/grain) framework for more details. Integration with Grain is a TODO.
191 changes: 191 additions & 0 deletions notebooks/webdataset_ingestion.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parallel Data Ingestion from WebDataset\n",
"\n",
"This example ingests data from the [WebDataset](https://github.com/webdataset/webdataset) to a Space dataset. The ingestion operation uses Ray runner to distribute the workload in a Ray cluster.\n",
"\n",
"We use [img2dataset](https://github.com/rom1504/img2dataset) to download popular ML datasets in the WebDataset format. Install the packages and download the COCO dataset following the [guide](https://github.com/rom1504/img2dataset/blob/main/dataset_examples/mscoco.md):"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"vscode": {
"languageId": "shellscript"
}
},
"outputs": [],
"source": [
"pip install webdataset img2dataset \n",
"\n",
"wget https://huggingface.co/datasets/ChristophSchuhmann/MS_COCO_2017_URL_TEXT/resolve/main/mscoco.parquet\n",
"\n",
"img2dataset --url_list mscoco.parquet --input_format \"parquet\" \\\n",
" --url_col \"URL\" --caption_col \"TEXT\" --output_format \"webdataset\" \\\n",
" --output_folder mscoco --processes_count 16 --thread_count 64 --image_size 256 \\\n",
" --enable_wandb True"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's assume the downloaded COCO WebDataset directory is \"COCO_DIR\" (local or in Cloud Storage). The next step is creating an empty Space dataset:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow as pa\n",
"\n",
"from space import DirCatalog\n",
"from space import ArrayRecordOptions, FileOptions\n",
"\n",
"# The schema of a new Space dataset to create.\n",
"schema = pa.schema([\n",
" (\"key\", pa.string()),\n",
" (\"caption\", pa.binary()),\n",
" (\"jpg\", pa.binary())])\n",
"\n",
"catalog = DirCatalog(\"/path/to/my/tables\")\n",
"ds = catalog.create_dataset(\"coco\", schema, primary_keys=[\"key\"],\n",
" record_fields=[\"jpg\"]) # Store \"jpg\" in ArrayRecord files\n",
"\n",
"# Or load an existing dataset.\n",
"# ds = catalog.dataset(\"images_size64\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Connect to a Ray runner and create a Ray runner for the dataset. See how to configure the options in the [setup and performance doc](/docs/performance.md#ray-runner-setup)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import ray\n",
"\n",
"# Connect to a Ray cluster. Or skip it to use a local Ray instance.\n",
"ray.init(address=\"ray://12.34.56.78:10001\")\n",
"\n",
"runner = ds.ray(\n",
" ray_options=RayOptions(max_parallelism=4),\n",
" file_options=FileOptions(\n",
" array_record_options=ArrayRecordOptions(options=\"group_size:64\")\n",
" ))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A WebDataset consists of a directory of tar files. A URL in form of `something-{000000..012345}.tar` representing a shard of the dataset, i.e., a subset of tar files. The following `read_webdataset` method returns an iterator to scan the shard described by a URL."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import webdataset as wds\n",
"\n",
"# The size of a batch returned by the iterator.\n",
"BATCH_SIZE = 64\n",
"\n",
"def read_webdataset(shard_url: str):\n",
" print(f\"Processing URL: {shard_url}\")\n",
"\n",
" def to_dict(keys, captions, jpgs):\n",
" return {\"key\": keys, \"caption\": captions, \"jpg\": jpgs}\n",
"\n",
" ds = wds.WebDataset(shard_url)\n",
" keys, captions, jpgs = [], [], []\n",
" for i, sample in enumerate(ds):\n",
" keys.append(sample[\"__key__\"])\n",
" captions.append(sample[\"txt\"])\n",
" jpgs.append(sample[\"jpg\"])\n",
"\n",
" if len(keys) == BATCH_SIZE:\n",
" yield (to_dict(keys, captions, jpgs))\n",
" keys.clear()\n",
" captions.clear()\n",
" jpgs.clear()\n",
"\n",
" if len(keys) > 0:\n",
" yield (to_dict(keys, captions, jpgs))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The whole COCO dataset has 60 tar files. We split it into multiple shards to ingest data in parallel:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"NUM_SHARDS = 60\n",
"NUM_WORKERS = 4\n",
"\n",
"COCO_DIR = \"/path/to/your/downloaded/coco\"\n",
"\n",
"shards_per_worker = NUM_SHARDS // NUM_WORKERS\n",
"shard_urls = []\n",
"for i in range(NUM_WORKERS):\n",
" start = f\"{i * shards_per_worker:05d}\"\n",
" end = f\"{min(NUM_SHARDS-1, (i + 1) * shards_per_worker - 1):05d}\"\n",
" shard_urls.append(COCO_DIR + f\"/{{{start}..{end}}}.tar\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `append_from` method takes a list of no-arg methods as input. Each method makes a new iterator that reads a shard of input data. The iterators are read in parallel on Ray nodes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from functools import partial\n",
"import time\n",
"\n",
"# A Ray remote fn to call `append_from` from the Ray cluster.\n",
"@ray.remote\n",
"def run():\n",
" runner.append_from([partial(read_webdataset, url) for url in shard_urls])\n",
"\n",
"# Start ingestion:\n",
"ray.get(run.remote())"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "space-datasets"
version = "0.0.8"
version = "0.0.9"
authors = [{ name = "Space team", email = "[email protected]" }]
description = "Unified storage framework for machine learning datasets"
readme = "README.md"
Expand Down
3 changes: 2 additions & 1 deletion python/src/space/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from space.catalogs.base import DatasetInfo
from space.catalogs.directory import DirCatalog
from space.core.datasets import Dataset
from space.core.options import JoinOptions, Range, ReadOptions
from space.core.options import (ArrayRecordOptions, FileOptions, JoinOptions,
ParquetWriterOptions, Range, ReadOptions)
from space.core.runners import LocalRunner
from space.core.random_access import RandomAccessDataSource
from space.core.schema.types import File, TfFeatures
Expand Down

0 comments on commit efb2f2d

Please sign in to comment.