Skip to content

Commit

Permalink
Merge pull request #117 from zpz/zepu
Browse files Browse the repository at this point in the history
Zepu
  • Loading branch information
zpz authored Sep 27, 2023
2 parents eae9854 + 19e07df commit 07bf371
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 12 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).


## [0.8.6] - 2023-08-26

- Made ``zstandard`` a required (as opposed to optional) dependency, because ``Biglist.DEFAULT_STORAGE_FORMAT`` defaults to ``pickle-zstd``.
- Decreased default value of ``Biglist._n_write_threads`` from 8 to 4.


## [0.8.5] - 2023-08-25

- ``BiglistBase`` now has custom support for pickling.
Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies = [
"deprecation",
"pyarrow >= 12.0.1",
"typing-extensions",
"zstandard",
]
requires-python = ">=3.8"
readme = "README.rst"
Expand All @@ -30,7 +31,7 @@ Source = "https://github.com/zpz/biglist"


[project.optional-dependencies]
zstandard = ["zstandard"]
zstandard = []
lz4 = ["lz4"]
doc = [
"sphinx < 7.2.0",
Expand All @@ -49,6 +50,9 @@ test = [
]


# `zstandard` has become required dependency.
# The `zstandard` extra will be removed later.

# See https://beta.ruff.rs/docs/rules/
[tool.ruff]
target-version = "py38"
Expand Down
2 changes: 1 addition & 1 deletion src/biglist/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@
Slicer,
)

__version__ = "0.8.5"
__version__ = "0.8.6"
1 change: 1 addition & 0 deletions src/biglist/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def load(self) -> None:


FileReaderType = TypeVar("FileReaderType", bound=FileReader)
'''This type variable indicates the class :class:`FileReader` or a subclass thereof.'''


class FileSeq(Seq[FileReaderType]):
Expand Down
24 changes: 18 additions & 6 deletions src/biglist/_biglist.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def new(
``serialize_kwargs`` and ``deserialize_kwargs`` are rarely needed.
One use case is ``schema`` when storage format is "parquet".
See :class:`~biglist._biglist.ParquetSerializer`.
``serialize_kwargs`` and ``deserialize_kwargs``, if not ``None``,
will be saved in the "info.json" file, hence they must be JSON
Expand Down Expand Up @@ -298,7 +299,15 @@ def __init__(
self._append_buffer: list = []
self._append_files_buffer: list = []
self._file_dumper = None
self._n_write_threads = 8

self._n_write_threads = 4
'''This value affects memory demand during quick "appending" (and flushing/dumping in the background).
If the memory consumption of each batch is large, you could manually set this to a lower value, like::
lst = Biglist(path)
lst._n_write_threads = 4
'''

self._serialize_kwargs = self.info.get("serialize_kwargs", {})
self._deserialize_kwargs = self.info.get("deserialize_kwargs", {})
if self.storage_format == "parquet" and "schema_spec" in self._serialize_kwargs:
Expand Down Expand Up @@ -496,11 +505,13 @@ def extend(self, x: Iterable[Element]) -> None:

def make_file_name(self, buffer_len: int, extra: str = '') -> str:
'''
If you need to customize the data file name for any reason, you should do that via ``extra``
This method constructs the file name of a data file.
If you need to customize this method for any reason, you should do it via ``extra``
and keep the other patterns unchanged.
The string ``extra`` will appear between other fixed patterns in the file name.
One possible usecase is this: in distributed writing, you want files written by different workers
to be distinguishable by the file names. Do something like this:
to be distinguishable by the file names. Do something like this::
def worker(datapath: str, worker_id: str, ...):
out = Biglist(datapath)
Expand Down Expand Up @@ -909,10 +920,11 @@ class Multiplexer:
The usage consists of two main parts:
1. In "controller" code, call :meth:`start` to start a new "session".
Different sessions (at the same time or otherwise) are independent consumers of the data.
Different sessions (at the same time or otherwise) are independent consumers of the data.
2. In "worker" code, use the session ID that was returned by :meth:`start` to instantiate
a Multiplexer and iterate over it. In so doing, multiple workers will obtain the data elements
collectively, i.e., each element is obtained by exactly one worker.
a Multiplexer and iterate over it. In so doing, multiple workers will obtain the data elements
collectively, i.e., each element is obtained by exactly one worker.
"""

@classmethod
Expand Down
9 changes: 5 additions & 4 deletions src/biglist/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ def get_gcsfs(cls, *, good_for_seconds=600) -> GcsFileSystem:
@classmethod
def load_file(cls, path: Upath) -> ParquetFile:
'''
Depending on the implementation, this may read *meta* info only, or
load in the entire file eagerly.
This reads *meta* info and constructs a ``pyarrow.parquet.ParquetFile`` object.
This does not load the entire file.
See :meth:`load` for eager loading.
Parameters
----------
Expand Down Expand Up @@ -156,12 +157,12 @@ def __len__(self) -> int:
return self.num_rows

def load(self) -> None:
"""Eagerly read in the whole file as a table."""
"""Eagerly read the whole file into memory as a table."""
if self._data is None:
self._data = ParquetBatchData(
self.file.read(columns=self._column_names, use_threads=True),
)
self._data.scalar_as_py = (self.scalar_as_py,)
self._data.scalar_as_py = self.scalar_as_py
if self.num_row_groups == 1:
assert self._row_groups is None
self._row_groups = [self._data]
Expand Down
1 change: 1 addition & 0 deletions src/biglist/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def __iter__(self) -> Iterator[Element]:


SeqType = TypeVar("SeqType", bound=Seq)
'''This type variable indicates the class :class:`Seq` or a subclass thereof.'''


class Slicer(Generic[SeqType]):
Expand Down

0 comments on commit 07bf371

Please sign in to comment.