diff --git a/src/lightning/data/streaming/writer.py b/src/lightning/data/streaming/writer.py index 98b7a31e07d1b..db20d93460a2b 100644 --- a/src/lightning/data/streaming/writer.py +++ b/src/lightning/data/streaming/writer.py @@ -322,6 +322,8 @@ def write_chunk_to_file( def write_chunks_index(self) -> str: """Write the chunks index to a JSON file.""" + if len(self._chunks_info) == 0: + return "" filepath = os.path.join(self._cache_dir, f"{self.rank}.{_INDEX_FILENAME}") config = self.get_config() with open(filepath, "w") as out: @@ -393,7 +395,6 @@ def _merge_no_wait(self, node_rank: Optional[int] = None) -> None: config = data["config"] elif config != data["config"]: - breakpoint() raise Exception("The config isn't consistent between chunks. This shouldn't have happened.") chunks_info.extend(data["chunks"]) diff --git a/tests/tests_data/processing/test_data_processor.py b/tests/tests_data/processing/test_data_processor.py index 06420130321a9..797d4911d1282 100644 --- a/tests/tests_data/processing/test_data_processor.py +++ b/tests/tests_data/processing/test_data_processor.py @@ -564,6 +564,9 @@ def test_data_processsor_nlp(tmpdir, monkeypatch): data_processor = DataProcessor(input_dir=str(tmpdir), num_workers=1, num_downloaders=1) data_processor.run(TextTokenizeRecipe(chunk_size=1024 * 11)) + data_processor_more_wokers = DataProcessor(input_dir=str(tmpdir), num_workers=2, num_downloaders=1) + data_processor_more_wokers.run(TextTokenizeRecipe(chunk_size=1024 * 11)) + class ImageResizeRecipe(DataTransformRecipe): def prepare_structure(self, input_dir: str):