Skip to content

Commit

Permalink
Changing 1-to-M behaviour of on_disk_cache.
Browse files Browse the repository at this point in the history
ghstack-source-id: 4bfa54e5646d08eb5601ebe99ad23b044cef52ae
Pull Request resolved: #810
  • Loading branch information
VitalyFedyunin committed Oct 6, 2022
1 parent b373bd2 commit 21b0bca
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 131 deletions.
49 changes: 13 additions & 36 deletions test/test_remote_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,48 +145,14 @@ def _filepath_fn(url):
with self.assertRaisesRegex(RuntimeError, "`end_caching` can only be invoked once"):
_ = tar_cache_dp.end_caching()

# Multiple filepaths
def _gen_filepath_fn(tar_path):
for i in range(3):
yield os.path.join(os.path.dirname(tar_path), "csv", f"{i}.csv")

# DataPipe Constructor
file_cache_dp = OnDiskCacheHolder(tar_cache_dp, filepath_fn=_gen_filepath_fn)
file_cache_dp = FileOpener(file_cache_dp, mode="rb")

# Functional API
file_cache_dp = file_cache_dp.load_from_tar()

def _csv_filepath_fn(csv_path):
return os.path.join(self.temp_dir.name, "csv", os.path.basename(csv_path))

# Read and decode
def _read_and_decode(x):
return x.read().decode()

file_cache_dp = file_cache_dp.map(fn=_read_and_decode, input_col=1)

file_cache_dp = EndOnDiskCacheHolder(file_cache_dp, mode="w", filepath_fn=_csv_filepath_fn, skip_read=True)

cached_it = iter(file_cache_dp)
for expected_csv_path in _gen_filepath_fn(expected_file_name):

# Check disabled due to some elements of prefetching inside of on_disck_cache
# self.assertFalse(os.path.exists(expected_csv_path))

csv_path = next(cached_it)

# File is cached to disk
self.assertTrue(os.path.exists(expected_csv_path))
self.assertEqual(expected_csv_path, csv_path)

# Cache decompressed archive but only check root directory
root_dir = "temp"

file_cache_dp = OnDiskCacheHolder(
tar_cache_dp, filepath_fn=lambda tar_path: os.path.join(os.path.dirname(tar_path), root_dir)
)
remember_cache_dp_object = file_cache_dp
file_cache_dp = FileOpener(file_cache_dp, mode="rb").load_from_tar()

file_cache_dp = file_cache_dp.end_caching(
mode="wb",
filepath_fn=lambda file_path: os.path.join(self.temp_dir.name, root_dir, os.path.basename(file_path)),
Expand All @@ -206,12 +172,23 @@ def _read_and_decode(x):
self.assertTrue(os.path.exists(expected_csv_path))
self.assertEqual(expected_csv_path, csv_path)

# This is the situation when previous process had no canche to release promise file on the file lists,
# as we are in same pid, we need to force iterators to finish by deleting or exhausing them
del cached_it

if not IS_WINDOWS:
dl = DataLoader(file_cache_dp, num_workers=3, multiprocessing_context="fork", batch_size=1)
expected = [[os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")] for i in range(3)] * 3
res = list(dl)
self.assertEqual(sorted(expected), sorted(res))

remember_cache_dp_object._download_everything = True
workers = 100
dl = DataLoader(file_cache_dp, num_workers=workers, multiprocessing_context="fork", batch_size=1)
expected = [[os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")] for i in range(3)] * workers
res = list(dl)
self.assertEqual(sorted(expected), sorted(res))

@skipIfNoFSSpecS3
def test_fsspec_io_iterdatapipe(self):
input_list = [
Expand Down
Loading

0 comments on commit 21b0bca

Please sign in to comment.