Skip to content

Commit

Permalink
Correct ls() for parquet backed nested reference sets (#1645)
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Jul 8, 2024
1 parent 1e2591c commit 71f4907
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 29 deletions.
59 changes: 30 additions & 29 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import math
import os
from itertools import chain
from functools import lru_cache
from typing import TYPE_CHECKING

Expand All @@ -16,10 +17,10 @@
if not TYPE_CHECKING:
import json

from ..asyn import AsyncFileSystem
from ..callbacks import DEFAULT_CALLBACK
from ..core import filesystem, open, split_protocol
from ..utils import isfilelike, merge_offset_ranges, other_paths
from fsspec.asyn import AsyncFileSystem
from fsspec.callbacks import DEFAULT_CALLBACK
from fsspec.core import filesystem, open, split_protocol
from fsspec.utils import isfilelike, merge_offset_ranges, other_paths

logger = logging.getLogger("fsspec.reference")

Expand Down Expand Up @@ -131,7 +132,6 @@ def __init__(
self.out_root = out_root or self.root
self.cat_thresh = categorical_threshold
self.cache_size = cache_size
self.dirs = None
self.url = self.root + "/{field}/refs.{record}.parq"
# TODO: derive fs from `root`
self.fs = fsspec.filesystem("file") if fs is None else fs
Expand Down Expand Up @@ -195,32 +195,36 @@ def create(root, storage_options=None, fs=None, record_size=10000, **kwargs):
fs.pipe("/".join([root, ".zmetadata"]), json.dumps(met).encode())
return LazyReferenceMapper(root, fs, **kwargs)

def listdir(self, basename=True):
@lru_cache()
def listdir(self):
"""List top-level directories"""
# cache me?
if self.dirs is None:
dirs = [p.split("/", 1)[0] for p in self.zmetadata]
self.dirs = {p for p in dirs if p and not p.startswith(".")}
listing = self.dirs
if basename:
listing = [os.path.basename(path) for path in listing]
return listing
dirs = (p.rsplit("/", 1)[0] for p in self.zmetadata if not p.startswith(".z"))
return set(dirs)

def ls(self, path="", detail=True):
"""Shortcut file listings"""
if not path:
dirnames = self.listdir()
others = set(
[".zmetadata"]
+ [name for name in self.zmetadata if "/" not in name]
+ [name for name in self._items if "/" not in name]
)
path = path.rstrip("/")
pathdash = path + "/" if path else ""
dirnames = self.listdir()
dirs = [
d
for d in dirnames
if d.startswith(pathdash) and "/" not in d.lstrip(pathdash)
]
if dirs:
others = {
f
for f in chain(
[".zmetadata"],
(name for name in self.zmetadata),
(name for name in self._items),
)
if f.startswith(pathdash) and "/" not in f.lstrip(pathdash)
}
if detail is False:
others.update(dirnames)
others.update(dirs)
return sorted(others)
dirinfo = [
{"name": name, "type": "directory", "size": 0} for name in dirnames
]
dirinfo = [{"name": name, "type": "directory", "size": 0} for name in dirs]
fileinfo = [
{
"name": name,
Expand All @@ -234,10 +238,7 @@ def ls(self, path="", detail=True):
for name in others
]
return sorted(dirinfo + fileinfo, key=lambda s: s["name"])
parts = path.split("/", 1)
if len(parts) > 1:
raise FileNotFoundError("Cannot list within directories right now")
field = parts[0]
field = path
others = set(
[name for name in self.zmetadata if name.startswith(f"{path}/")]
+ [name for name in self._items if name.startswith(f"{path}/")]
Expand Down
24 changes: 24 additions & 0 deletions fsspec/implementations/tests/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,3 +759,27 @@ def test_append_parquet(lazy_refs, m):
with pytest.raises(KeyError):
lazy2["data/0"]
assert lazy2["data/1"] == b"Adata"


def test_deep_parq(m):
zarr = pytest.importorskip("zarr")
lz = fsspec.implementations.reference.LazyReferenceMapper.create(
"memory://out.parq", fs=m
)
g = zarr.open_group(lz, mode="w")
g2 = g.create_group("instant")
g2.create_dataset(name="one", data=[1, 2, 3])
lz.flush()

lz = fsspec.implementations.reference.LazyReferenceMapper("memory://out.parq", fs=m)
g = zarr.open_group(lz)
assert g.instant.one[:].tolist() == [1, 2, 3]
assert sorted(_["name"] for _ in lz.ls("")) == [".zgroup", ".zmetadata", "instant"]
assert sorted(_["name"] for _ in lz.ls("instant")) == [
"instant/.zgroup",
"instant/one",
]
assert sorted(_["name"] for _ in lz.ls("instant/one")) == [
"instant/one/.zarray",
"instant/one/0",
]

0 comments on commit 71f4907

Please sign in to comment.