diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d68a852f..851f7bcf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,7 +47,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v1 with: - python-version: "3.x" + python-version: "3.8" - name: Install dependencies run: | diff --git a/CHANGES.md b/CHANGES.md index 8e44acaa..0f0b067b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,8 @@ +## 2.0.0b18 (2020-10-22) + +* surface dataset.nodata in COGReader.nodata property (https://github.com/cogeotiff/rio-tiler/pull/292) +* fix non-threaded tasks scheduler/filter (https://github.com/cogeotiff/rio-tiler/pull/291) + ## 2.0.0b17 (2020-10-13) * switch to morecantile for TMS definition (ref: https://github.com/cogeotiff/rio-tiler/issues/283) diff --git a/rio_tiler/mosaic/reader.py b/rio_tiler/mosaic/reader.py index 344d9006..63d4a92c 100644 --- a/rio_tiler/mosaic/reader.py +++ b/rio_tiler/mosaic/reader.py @@ -73,7 +73,7 @@ def reader(asset: str, *args, **kwargs) -> Tuple[numpy.ndarray, numpy.ndarray]: ) if not chunk_size: - chunk_size = threads or len(assets) + chunk_size = threads if threads > 1 else len(assets) assets_used: List[str] = [] diff --git a/rio_tiler/tasks.py b/rio_tiler/tasks.py index 6bc360fc..b92cb511 100644 --- a/rio_tiler/tasks.py +++ b/rio_tiler/tasks.py @@ -1,26 +1,15 @@ """rio_tiler.tasks: tools for handling rio-tiler's future tasks.""" from concurrent import futures -from typing import ( - Any, - Callable, - Dict, - Generator, - Iterator, - Optional, - Sequence, - Tuple, - Union, -) +from functools import partial +from typing import Any, Callable, Dict, Generator, Optional, Sequence, Tuple, Union import numpy from .constants import MAX_THREADS from .logger import logger -TaskType = Union[ - Generator[Tuple[Callable, str], None, None], Iterator[Tuple[futures.Future, str]], -] +TaskType = Sequence[Tuple[Union[futures.Future, Callable], str]] def filter_tasks( @@ -31,8 +20,8 @@ def filter_tasks( Attributes ---------- - tasks: list or generator - Sequence of 'concurrent.futures._base.Future' or 'callable' + tasks: list + Sequence of 'concurrent.futures._base.Future' or 'Callable' allowed_exceptions: Tuple, optional List of exceptions which won't be raised. @@ -44,15 +33,12 @@ def filter_tasks( if not allowed_exceptions: allowed_exceptions = () - while True: + for (future, asset) in tasks: try: - future, asset = next(tasks) # type: ignore if isinstance(future, futures.Future): yield future.result(), asset else: - yield future, asset - except StopIteration: - break + yield future(), asset except allowed_exceptions as err: logger.info(err) pass @@ -61,15 +47,15 @@ def filter_tasks( def create_tasks(reader: Callable, assets, threads, *args, **kwargs) -> TaskType: """Create Future Tasks.""" if threads and threads > 1: + logger.debug(f"Running tasks in ThreadPool with max_workers={threads}") with futures.ThreadPoolExecutor(max_workers=threads) as executor: - return iter( - [ - (executor.submit(reader, asset, *args, **kwargs), asset) - for asset in assets - ] - ) + return [ + (executor.submit(reader, asset, *args, **kwargs), asset) + for asset in assets + ] else: - return ((reader(asset, *args, **kwargs), asset) for asset in assets) + logger.debug(f"Running tasks outside ThreadsPool (max_workers={threads})") + return [(partial(reader, asset, *args, **kwargs), asset) for asset in assets] def multi_arrays( diff --git a/tests/test_mosaic.py b/tests/test_mosaic.py index d988ca86..eccf358c 100644 --- a/tests/test_mosaic.py +++ b/tests/test_mosaic.py @@ -35,8 +35,7 @@ def _read_tile(src_path: str, *args, **kwargs) -> Tuple[numpy.ndarray, numpy.ndarray]: """Read tile from an asset""" with COGReader(src_path) as cog: - tile, mask = cog.tile(*args, **kwargs) - return tile, mask + return cog.tile(*args, **kwargs) def _read_preview( @@ -222,18 +221,64 @@ def test_mosaic_tiler_Stdev(): def test_threads(): """Test mosaic tiler.""" - assets = [asset1, asset2, asset1, asset2, asset1, asset2] + assets = [asset2, asset1, asset1, asset2, asset1, asset2] # TileOutSide bounds should be ignore and thus Tile is None (tnothread, _), _ = mosaic.mosaic_reader(assets, _read_tile, xo, yo, zo, threads=2) assert not tnothread + # TileOutSide bounds should be ignore and thus Tile is None (tnothread, _), _ = mosaic.mosaic_reader(assets, _read_tile, xo, yo, zo, threads=0) assert not tnothread - (tnothread, _), _ = mosaic.mosaic_reader(assets, _read_tile, x, y, z, threads=0) + # Only cover asset1 + xpp = 147 + ypp = 180 + zpp = 9 + + # Partial tile, some assets should Error with TileOutside bounds + (tnothread, _), a = mosaic.mosaic_reader( + assets, + _read_tile, + xpp, + ypp, + zpp, + threads=0, + pixel_selection=defaults.MedianMethod, + ) + assert len(a) == 3 + assert tnothread.shape + + # Partial tile, some assets should Error with TileOutside bounds + (tnothread, _), a = mosaic.mosaic_reader( + assets, + _read_tile, + xpp, + ypp, + zpp, + threads=1, + pixel_selection=defaults.MedianMethod, + ) + assert len(a) == 3 + assert tnothread.shape + + (tnothread, _), _ = mosaic.mosaic_reader( + assets, + _read_tile, + xpp, + ypp, + zpp, + threads=0, + pixel_selection=defaults.MedianMethod, + ) (tmulti_threads, _), _ = mosaic.mosaic_reader( - assets, _read_tile, x, y, z, threads=1 + assets, + _read_tile, + xpp, + ypp, + zpp, + threads=3, + pixel_selection=defaults.MedianMethod, ) numpy.testing.assert_array_equal(tnothread, tmulti_threads)