Skip to content

Commit

Permalink
Use pattern instead of directory in file loaders, update TFDS example
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Jan 13, 2024
1 parent 88c2b7a commit 9db1a1f
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 101 deletions.
156 changes: 120 additions & 36 deletions notebooks/tfds_coco_tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,27 @@
"source": [
"## Manage Tensorflow COCO dataset\n",
"\n",
"[TFDS COCO dataset](https://www.tensorflow.org/datasets/catalog/coco) defines the following features structure `tf_features_dict`. It is used for serializing complex nested data into bytes, and deserialize it back."
"This example will load the [TFDS COCO dataset](https://www.tensorflow.org/datasets/catalog/coco) into Space without copying ArrayRecord files. We will demonstrate how to modify data and use SQL engine to analyze annotations.\n",
"\n",
"First let's download the COCO datasets in ArrayRecord format by following the [TFDS docs](https://www.tensorflow.org/datasets/tfless_tfds)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow_datasets as tfds\n",
"\n",
"tfds.data_source('coco/2017')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The TFDS COCO dataset defines the following features structure `tf_features_dict`. It is used for serializing complex nested data into bytes:"
]
},
{
Expand All @@ -20,6 +40,8 @@
"\n",
"tf_features_dict = f.FeaturesDict({\n",
" \"image\": f.Image(shape=(None, None, 3), dtype=np.uint8),\n",
" \"image/filename\": f.Text(),\n",
" \"image/id\": np.int64,\n",
" \"objects\": f.Sequence({\n",
" \"area\": np.int64,\n",
" \"bbox\": f.BBoxFeature(),\n",
Expand All @@ -34,7 +56,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example, we move the COCO dataset from TFDS to Space. In addition, we copy the `objects` field above from the row-oriented files to Parquet files, so we can run SQL queries on it.\n",
"We will make a copy of the above `objects` field into new Parquet files. This field will thus exist in both ArrayRecord files (original TFDS data, for feeding into training framework), and in Parquet for SQL queries. Note that the bulky image data is not copied to Parquet.\n",
"\n",
"The Space dataset's schema is:"
]
Expand All @@ -48,6 +70,7 @@
"import pyarrow as pa\n",
"from space import TfFeatures # A custom PyArrow type.\n",
"\n",
"# Equivalent to the `objects` field in the above FeaturesDict.\n",
"object_schema = pa.struct([\n",
" (\"area\", pa.int64()),\n",
" (\"bbox\", pa.list_(pa.float32())), # TODO: to use fixed size list.\n",
Expand All @@ -59,8 +82,8 @@
"ds_schema = pa.schema([\n",
" (\"id\", pa.int64()),\n",
" (\"filename\", pa.string()),\n",
" (\"objects\", pa.list_(object_schema)),\n",
" (\"features\", TfFeatures(tf_features_dict))\n",
" (\"objects\", pa.list_(object_schema)), # A copy of `objects` in Parquet files.\n",
" (\"features\", TfFeatures(tf_features_dict)) # The original TFDS data.\n",
"])"
]
},
Expand All @@ -77,18 +100,20 @@
"metadata": {},
"outputs": [],
"source": [
"# record_fields will be stored in ArrayRecord files.\n",
"ds = Dataset.create(\"/path/to/space/mybucket/demo\",\n",
" ds_schema, primary_keys=[\"id\"], record_fields=[\"features\"])"
"from space import Dataset\n",
"\n",
"ds_location = \"/directory/coco_demo\" # Change it to your preferred location\n",
"\n",
"ds = Dataset.create(ds_location, ds_schema,\n",
" primary_keys=[\"id\"],\n",
" record_fields=[\"features\"]) # The `features` field is stored in ArrayRecord files."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following code defines a method `index_fn` that reads ArrayRecord files and builds indexes for it. The method returns three index fields (`id`, `filename`, `objects`) to be written into the Space dataset's Parquet files. At the same time, the row's address in the input ArrayRecord files are also persisted.\n",
"\n",
"Calling `load_array_record` will processes all ArrayRecord files in the folder `/path/to/tfds/coco/files` using this method. The COCO dataset is now under Space's management after the call completes."
"The following code defines a method `index_fn` that reads ArrayRecord files and builds indexes for it. The method returns three index fields (`id`, `filename`, `objects`) to be written to Parquet files, together with the row's addresses in the ArrayRecord files. See the [storage design](../docs/design.md) for details."
]
},
{
Expand All @@ -97,28 +122,64 @@
"metadata": {},
"outputs": [],
"source": [
"from typing import Any, Dict\n",
"from typing import Any, Dict, List\n",
"\n",
"def pydict_to_pylist(objects: Dict[str, Any]) -> List[Dict[str, Any]]:\n",
" return [\n",
" {\"id\": area, \"area\": id_, \"bbox\": boxes, \"is_crowd\": is_crowds, \"label\": labels}\n",
" for area, id_, boxes, is_crowds, labels in\n",
" zip(objects[\"area\"], objects[\"id\"], objects[\"bbox\"], objects[\"is_crowd\"], objects[\"label\"])\n",
" ]\n",
"\n",
"def index_fn(example: Dict[str, Any]) -> Dict[str, Any]:\n",
" # Input format:\n",
" # key: Space record field name, value: [deserialized TFDS value] (size is 1)\n",
" # e.g., {\"features\": [{\"image\": v, \"image/id\": v, \"image/filename\": v, \"objects\": v}]}\n",
" example = example[\"features\"][0]\n",
" return {\n",
" \"id\": example[\"image/id\"],\n",
" \"filename\": example[\"image/filename\"],\n",
" \"objects\": coco_utils.tf_objects_to_pylist(example[\"objects\"]),\n",
" }\n",
" \"objects\": pydict_to_pylist(example[\"objects\"]),\n",
" }"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Calling `load_array_record` will processes all input ArrayRecord files using `index_fn` to obtain indexes. The loading will complete after the index fields have been written for all ArrayRecord records."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TFDS downloaded files, replace it with your path\n",
"input_pattern = \"/tensorflow_datasets/coco/2017/1.1.0/coco-validation.array_record*\"\n",
"\n",
"runner = ds.local()\n",
"# \"/path/to/tfds/coco/files\" is where TFDS saves the downloaded\n",
"# ArrayRecord files.\n",
"runner.load_array_record(\"/path/to/tfds/coco/files\", index_fn)\n",
"ds.add_tag(\"initialized\") # Tag the current version."
"ds.local().append_array_record(input_pattern, index_fn)\n",
"# >>>\n",
"# JobResult(state=<State.SUCCEEDED: 1>, storage_statistics_update=num_rows: 5000\n",
"# index_compressed_bytes: 31842\n",
"# index_uncompressed_bytes: 47048\n",
"# record_uncompressed_bytes: 816568313\n",
"# , error_message=None)\n",
"\n",
"ds.add_tag(\"initialized\") # Tag the current version.\n",
"\n",
"# Check loaded image IDs.\n",
"image_ids = ds.local().read_all(fields=[\"id\"])\n",
"image_ids.num_rows"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now the `objects` field in TFDS becomes a columnar field that can be analyzed via SQL:"
"Objects are stored in a columnar field `objects` now. Read it into memory as a PyArrow table, and use [DuckDB](https://github.com/duckdb/duckdb) SQL to query it:"
]
},
{
Expand All @@ -129,19 +190,21 @@
"source": [
"import duckdb\n",
"\n",
"# Load the \"objects\" column into memory as PyArrow and query using DuckDB.\n",
"# The SQL query returns the largest object bbox area in the dataset.\n",
"objects = runner.read_all(fields=[\"objects\"])\n",
"duckdb.sql(\n",
" \"SELECT MAX(objs.area) FROM (SELECT unnest(objects) AS objs FROM objects)\"\n",
").fetchall()"
"# Compute the min/max object bbox area in the dataset.\n",
"sql = \"\"\"\n",
"SELECT MIN(objs.area), MAX(objs.area) FROM (\n",
" SELECT UNNEST(objects) AS objs FROM objects)\n",
"\"\"\"\n",
"\n",
"objects = ds.local().read_all(fields=[\"objects\"])\n",
"duckdb.sql(sql).fetchall()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Space supports data mutations; each modification generates a new version (`snapshot_id`). It supports reading any previous versions (time travel)."
"Space datasets are mutable. You can run append, insert, upsert, delete operations, locally (ds.local()) or distributedly (ds.ray()). A new snapshot of dataset is generated after a mutation, you can read previous snapshots by providing a snapshot ID or a tag."
]
},
{
Expand All @@ -152,21 +215,26 @@
"source": [
"import pyarrow.compute as pc\n",
"\n",
"# Delete a row from a Space dataset. The mutation creates a new snapshot.\n",
"runner.delete(pc.field(\"id\") == pc.scalar(361586))\n",
"# Delete a row from a Space dataset. The mutation creates a new snapshot\n",
"ds.local().delete(pc.field(\"id\") == pc.scalar(361586))\n",
"ds.add_tag(\"delete_some_data\") # Tag the new snapshot\n",
"\n",
"# Read the current version:\n",
"runner.read()\n",
"# Check total rows\n",
"ds.local().read_all(fields=[\"id\"]).num_rows\n",
"# >>>\n",
"# 4999\n",
"\n",
"# Time travel back to before the deletion, by setting a read version.\n",
"runner.read(version=\"initialized\")"
"ds.local().read_all(version=\"initialized\", fields=[\"id\"]).num_rows\n",
"# >>>\n",
"# 5000"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Read data from the Space dataset through a [random access data source interface](https://www.tensorflow.org/datasets/tfless_tfds)."
"Read data (the `features` field) in the original TFDS format via a [random access data source interface](https://www.tensorflow.org/datasets/tfless_tfds), as the input of training frameworks:"
]
},
{
Expand All @@ -179,11 +247,27 @@
"\n",
"datasource = RandomAccessDataSource(\n",
" # field-name: storage-location, for reading data from ArrayRecord files.\n",
" {\n",
" \"features\": \"/path/to/space/mybucket/demo\",\n",
" },\n",
" # Auto deserialize data using `tf_features_dict`.\n",
" deserialize=True)"
" {\"features\": ds_location},\n",
" deserialize=True, # Auto deserialize data using `tf_features_dict`.\n",
" use_array_record_data_source=False)\n",
"\n",
"len(datasource)\n",
"# >>>\n",
"# 4999\n",
"\n",
"# Read the original TFDS data.\n",
"datasource[11]\n",
"# >>>\n",
"# {'image': array([[[239, 239, 237],\n",
"# [239, 239, 241],\n",
"# [239, 239, 239],\n",
"# ...\n",
"# [245, 246, 240]]], dtype=uint8),\n",
"# 'image/filename': b'000000292082.jpg', ...\n",
"# 'bbox': array([[0.51745313, 0.30523437, 0.6425156 , 0.3789375 ], ...\n",
"# 'id': array([ 294157, 467036, 1219153, 1840967, 1937564]),\n",
"# 'is_crowd': array([False, False, False, False, False]),\n",
"# 'label': array([27, 0, 0, 27, 56])}}"
]
}
],
Expand Down
12 changes: 8 additions & 4 deletions python/src/space/core/loaders/array_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
"""Load ArrayRecord files into Space datasets."""

from typing import Any, Callable, Dict, List, Optional, Tuple
import glob

import pyarrow as pa
from typing_extensions import TypeAlias

from space.core.fs.array_record import read_record_file
from space.core.loaders.utils import list_files
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.ops import utils
Expand All @@ -38,13 +38,17 @@ class LocalArrayRecordLoadOp(StoragePathsMixin):

# pylint: disable=too-many-arguments
def __init__(self, location: str, metadata: meta.StorageMetadata,
input_dir: str, index_fn: ArrayRecordIndexFn,
pattern: str, index_fn: ArrayRecordIndexFn,
file_options: FileOptions):
"""
Args:
pattern: file path pattern of the input ArrayRecord files, e.g.,
"/directory/*.array_record"
"""
StoragePathsMixin.__init__(self, location)
self._file_options = file_options

self._metadata = metadata
self._input_dir = input_dir
self._index_fn = index_fn

record_fields = set(self._metadata.schema.record_fields)
Expand All @@ -62,7 +66,7 @@ def __init__(self, location: str, metadata: meta.StorageMetadata,
self._record_field = self._record_fields[0]

self._serializer = DictSerializer.create(logical_schema)
self._input_files = list_files(input_dir, substr=".array_record")
self._input_files = glob.glob(pattern)

def write(self) -> Optional[rt.Patch]:
"""Write index files to load ArrayRecord files to Space dataset."""
Expand Down
13 changes: 8 additions & 5 deletions python/src/space/core/loaders/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
"""Load Parquet files into Space datasets."""

from typing import Optional
import glob

import pyarrow.parquet as pq

from space.core.manifests import IndexManifestWriter
from space.core.loaders.utils import list_files
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.ops import utils
Expand All @@ -31,18 +31,21 @@ class LocalParquetLoadOp(StoragePathsMixin):
"""Load ArrayRecord files into Space without copying data."""

def __init__(self, location: str, metadata: meta.StorageMetadata,
input_dir: str):
pattern: str):
"""
Args:
pattern: file path pattern of the input Parquet files, e.g.,
"/directory/*.parquet"
"""
StoragePathsMixin.__init__(self, location)

self._metadata = metadata
self._input_dir = input_dir

assert len(self._metadata.schema.record_fields) == 0

self._physical_schema = arrow.arrow_schema(self._metadata.schema.fields,
set(),
physical=True)
self._input_files = list_files(input_dir, suffix=".parquet")
self._input_files = glob.glob(pattern)

def write(self) -> Optional[rt.Patch]:
"""Write metadata files to load Parquet files to Space dataset."""
Expand Down
32 changes: 0 additions & 32 deletions python/src/space/core/loaders/utils.py

This file was deleted.

Loading

0 comments on commit 9db1a1f

Please sign in to comment.