diff --git a/.gitignore b/.gitignore index 8bcb63e..68bd129 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ .DS_Store *.pem +.idea + dist/**/* .virtualenvs diff --git a/README.md b/README.md index 8a255c6..6839cab 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,9 @@ https://github.com/tangledpath/csv-batcher ## Documentation https://tangledpath.github.io/csv-batcher/csv_batcher.html -## TODO -* Better integrate results from callbacks -* Maybe implement pooling with celery (for use in django apps, etc.), which can bring about [horizontal scaling]([url](https://en.wikipedia.org/wiki/Scalability#Horizontal_or_scale_out)). +## Further excercises +* Possibly implement pooling with celery (for use in django apps, etc.), which can bring about [horizontal scaling]([url](https://en.wikipedia.org/wiki/Scalability#Horizontal_or_scale_out)). + ## Usage Arguments sent to callback function can be controlled by creating pooler with `callback_with` and the CallbackWith enum @@ -22,88 +22,93 @@ values: ### As dataframe row ```python - from csv_batcher.csv_pooler import CSVPooler, CallbackWith +from csv_batcher.csv_pooler import CSVPooler, CallbackWith - # Callback function passed to pooler; accepts a dataframe row - # as a pandas Series (via apply) - def process_dataframe_row(row): +# Callback function passed to pooler; accepts a dataframe row +# as a pandas Series (via apply) +def process_dataframe_row(row): return row.iloc[0] - pooler = CSVPooler( +pooler = CSVPooler( "5mSalesRecords.csv", process_dataframe_row, callback_with=CallbackWith.DATAFRAME_ROW, pool_size=16 - ) - pooler.process() +) +for processed_batch in pooler.process(): + print(processed_batch) +``` ### As dataframe ```python - from csv_batcher.csv_pooler import CSVPooler, CallbackWith +from csv_batcher.csv_pooler import CSVPooler, CallbackWith - # Used in DataFrame.apply: - def process_dataframe_row(row): +# Used from process_datafrom's apply: +def process_dataframe_row(row): return row.iloc[0] - # Callback function passed to pooler; accepts a dataframe: - def process_dataframe(df): +# Callback function passed to pooler; accepts a dataframe: +def process_dataframe(df): foo = df.apply(process_dataframe_row, axis=1) # Or do something more complicated.... return len(df) - pooler = CSVPooler( +pooler = CSVPooler( "5mSalesRecords.csv", process_dataframe, callback_with=CallbackWith.DATAFRAME, pool_size=16 - ) - pooler.process() +) +for processed_batch in pooler.process(): + print(processed_batch) +``` ### As CSV filename ```python - from csv_batcher.csv_pooler import CSVPooler, CallbackWith +import pandas as pd +from csv_batcher.csv_pooler import CSVPooler, CallbackWith - def process_csv_filename(csv_chunk_filename): - # print("processing ", csv_chunk_filename) - df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) - foo = df.apply(process_dataframe_row, axis=1) - return len(df) - - def process_as_dataframe(df): - foo = df.apply(process_dataframe_row, axis=1) - return len(df) +# Used from process_csv_filename's apply: +def process_dataframe_row(row): + return row.iloc[0] - def process_dataframe_row(row): - return row.iloc[0] +def process_csv_filename(csv_chunk_filename): + # print("processing ", csv_chunk_filename) + df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) + foo = df.apply(process_dataframe_row, axis=1) + return len(df) - pooler = CSVPooler( +pooler = CSVPooler( "5mSalesRecords.csv", - process_dataframe, - callback_with=CallbackWith.CSV_FILENAME + process_csv_filename, + callback_with=CallbackWith.CSV_FILENAME, chunk_lines=10000, pool_size=16 - ) - pooler.process() +) +for processed_batch in pooler.process(): + print(processed_batch) ``` ## Development ### Linting ```bash - ruff check . # Find linting errors - ruff check . --fix # Auto-fix linting errors (where possible) +ruff check . # Find linting errors +ruff check . --fix # Auto-fix linting errors (where possible) ``` ### Documentation ``` - # Shows in browser - poetry run pdoc csv_batcher - # Generates to ./docs - poetry run pdoc csv_batcher -o ./docs +# Shows in browser +poetry run pdoc csv_batcher +# Generates to ./docs +poetry run pdoc csv_batcher -o ./docs ``` ### Testing ```bash - clear; pytest +clear; pytest ``` ### Publishing -`poetry publish --build -u __token__ -p $PYPI_TOKEN` +```bash +poetry publish --build -u __token__ -p $PYPI_TOKEN` +``` diff --git a/csv_batcher/csv_pooler.py b/csv_batcher/csv_pooler.py index 1b7d295..0f3cfb8 100644 --- a/csv_batcher/csv_pooler.py +++ b/csv_batcher/csv_pooler.py @@ -66,8 +66,9 @@ def process(self): csv_file_cnt = len(csv_splitter.csv_files()) logging.info(f"Pooling against {csv_file_cnt} files") with Pool(self.pool_size) as p: - for result in p.imap(self._process_csv, csv_splitter.csv_files()): - processed_count += result + for result, count in p.imap(self._process_csv, csv_splitter.csv_files()): + yield(result) + processed_count += count finally: csv_splitter.cleanup() @@ -75,17 +76,17 @@ def process(self): def _process_csv(self, csv_chunk_filename): if self.callback_with == CallbackWith.CSV_FILENAME: - self.process_fn(csv_chunk_filename) + result = self.process_fn(csv_chunk_filename) with open(csv_chunk_filename) as f: # Get total lines and subtract for header: - result = sum(1 for line in f) - 1 + count = sum(1 for line in f) - 1 elif self.callback_with == CallbackWith.DATAFRAME: df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) - result = df.shape[0] - self.process_fn(df) + count = df.shape[0] + result = self.process_fn(df) elif self.callback_with == CallbackWith.DATAFRAME_ROW: df = pd.read_csv(csv_chunk_filename, skipinitialspace=True, index_col=None) - result = df.shape[0] - df.apply(self.process_fn, axis=1) + count = df.shape[0] + result = df.apply(self.process_fn, axis=1) - return result + return result, count diff --git a/csv_batcher/test_csv_pooler.py b/csv_batcher/test_csv_pooler.py index c061554..e54ec38 100644 --- a/csv_batcher/test_csv_pooler.py +++ b/csv_batcher/test_csv_pooler.py @@ -1,4 +1,3 @@ -import unittest from csv_batcher.utils.time import time_and_log from csv_batcher.csv_pooler import CSVPooler, CallbackWith import pandas as pd @@ -17,22 +16,29 @@ def __process_as_dataframe(df): def test_big_file_as_csv(): with time_and_log("test_big_file_as_csv"): pooler = CSVPooler("5mSalesRecords.csv", __process_csv_filename) - pooler.process() + for processed_batch in pooler.process(): + assert isinstance(processed_batch, pd.Series) def test_big_file_as_dataframe(): with time_and_log("test_big_file_as_dataframe"): pooler = CSVPooler("5mSalesRecords.csv", __process_as_dataframe, callback_with=CallbackWith.DATAFRAME) - pooler.process() + for processed_batch in pooler.process(): + assert isinstance(processed_batch, pd.Series) def test_big_file_as_dataframe_rows(): with time_and_log("test_big_file_as_dataframe_rows"): pooler = CSVPooler("5mSalesRecords.csv", __process_dataframe_row, callback_with=CallbackWith.DATAFRAME_ROW) - pooler.process() + for processed_batch in pooler.process(): + assert isinstance(processed_batch, pd.Series) def test_no_pooler(): with time_and_log("test_no_pooler"): __process_csv_filename("5mSalesRecords.csv") -if __name__ == "__main__": - unittest.main() +if __name__ == '__main__': + test_big_file_as_csv() + test_big_file_as_dataframe() + test_big_file_as_dataframe_rows() + # test_migrator_idempotency() + diff --git a/docs/csv_batcher.html b/docs/csv_batcher.html index d562e1c..bd132ed 100644 --- a/docs/csv_batcher.html +++ b/docs/csv_batcher.html @@ -25,9 +25,11 @@

Contents