Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

second but now tested Tasks generator with/without threads #291

Merged
merged 5 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v1
with:
python-version: "3.x"
python-version: "3.8"

- name: Install dependencies
run: |
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.0.0b18 (2020-10-22)

* surface dataset.nodata in COGReader.nodata property (https://github.com/cogeotiff/rio-tiler/pull/292)
* fix non-threaded tasks scheduler/filter (https://github.com/cogeotiff/rio-tiler/pull/291)

## 2.0.0b17 (2020-10-13)

* switch to morecantile for TMS definition (ref: https://github.com/cogeotiff/rio-tiler/issues/283)
Expand Down
2 changes: 1 addition & 1 deletion rio_tiler/mosaic/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def reader(asset: str, *args, **kwargs) -> Tuple[numpy.ndarray, numpy.ndarray]:
)

if not chunk_size:
chunk_size = threads or len(assets)
chunk_size = threads if threads > 1 else len(assets)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this explains why it was fine with threads=1, because we were processing chunks of one element.


assets_used: List[str] = []

Expand Down
42 changes: 14 additions & 28 deletions rio_tiler/tasks.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,15 @@
"""rio_tiler.tasks: tools for handling rio-tiler's future tasks."""

from concurrent import futures
from typing import (
Any,
Callable,
Dict,
Generator,
Iterator,
Optional,
Sequence,
Tuple,
Union,
)
from functools import partial
from typing import Any, Callable, Dict, Generator, Optional, Sequence, Tuple, Union

import numpy

from .constants import MAX_THREADS
from .logger import logger

TaskType = Union[
Generator[Tuple[Callable, str], None, None], Iterator[Tuple[futures.Future, str]],
]
TaskType = Sequence[Tuple[Union[futures.Future, Callable], str]]


def filter_tasks(
Expand All @@ -31,8 +20,8 @@ def filter_tasks(

Attributes
----------
tasks: list or generator
Sequence of 'concurrent.futures._base.Future' or 'callable'
tasks: list
Sequence of 'concurrent.futures._base.Future' or 'Callable'
allowed_exceptions: Tuple, optional
List of exceptions which won't be raised.

Expand All @@ -44,15 +33,12 @@ def filter_tasks(
if not allowed_exceptions:
allowed_exceptions = ()

while True:
for (future, asset) in tasks:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we don't need generators anymore we can switch back to simple Sequence

try:
future, asset = next(tasks) # type: ignore
if isinstance(future, futures.Future):
yield future.result(), asset
else:
yield future, asset
except StopIteration:
break
yield future(), asset
except allowed_exceptions as err:
logger.info(err)
pass
Expand All @@ -61,15 +47,15 @@ def filter_tasks(
def create_tasks(reader: Callable, assets, threads, *args, **kwargs) -> TaskType:
"""Create Future Tasks."""
if threads and threads > 1:
logger.debug(f"Running tasks in ThreadPool with max_workers={threads}")
with futures.ThreadPoolExecutor(max_workers=threads) as executor:
return iter(
[
(executor.submit(reader, asset, *args, **kwargs), asset)
for asset in assets
]
)
return [
(executor.submit(reader, asset, *args, **kwargs), asset)
for asset in assets
]
else:
return ((reader(asset, *args, **kwargs), asset) for asset in assets)
logger.debug(f"Running tasks outside ThreadsPool (max_workers={threads})")
return [(partial(reader, asset, *args, **kwargs), asset) for asset in assets]


def multi_arrays(
Expand Down
55 changes: 50 additions & 5 deletions tests/test_mosaic.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
def _read_tile(src_path: str, *args, **kwargs) -> Tuple[numpy.ndarray, numpy.ndarray]:
"""Read tile from an asset"""
with COGReader(src_path) as cog:
tile, mask = cog.tile(*args, **kwargs)
return tile, mask
return cog.tile(*args, **kwargs)


def _read_preview(
Expand Down Expand Up @@ -222,18 +221,64 @@ def test_mosaic_tiler_Stdev():

def test_threads():
"""Test mosaic tiler."""
assets = [asset1, asset2, asset1, asset2, asset1, asset2]
assets = [asset2, asset1, asset1, asset2, asset1, asset2]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple but important changes, before we were just lucky because the tiler was first yielding result for asset1. This change was made during testing to validate the new implementation


# TileOutSide bounds should be ignore and thus Tile is None
(tnothread, _), _ = mosaic.mosaic_reader(assets, _read_tile, xo, yo, zo, threads=2)
assert not tnothread

# TileOutSide bounds should be ignore and thus Tile is None
(tnothread, _), _ = mosaic.mosaic_reader(assets, _read_tile, xo, yo, zo, threads=0)
assert not tnothread

(tnothread, _), _ = mosaic.mosaic_reader(assets, _read_tile, x, y, z, threads=0)
# Only cover asset1
xpp = 147
ypp = 180
zpp = 9

# Partial tile, some assets should Error with TileOutside bounds
(tnothread, _), a = mosaic.mosaic_reader(
assets,
_read_tile,
xpp,
ypp,
zpp,
threads=0,
pixel_selection=defaults.MedianMethod,
)
assert len(a) == 3
assert tnothread.shape

# Partial tile, some assets should Error with TileOutside bounds
(tnothread, _), a = mosaic.mosaic_reader(
assets,
_read_tile,
xpp,
ypp,
zpp,
threads=1,
pixel_selection=defaults.MedianMethod,
)
assert len(a) == 3
assert tnothread.shape

(tnothread, _), _ = mosaic.mosaic_reader(
assets,
_read_tile,
xpp,
ypp,
zpp,
threads=0,
pixel_selection=defaults.MedianMethod,
)
(tmulti_threads, _), _ = mosaic.mosaic_reader(
assets, _read_tile, x, y, z, threads=1
assets,
_read_tile,
xpp,
ypp,
zpp,
threads=3,
pixel_selection=defaults.MedianMethod,
)
numpy.testing.assert_array_equal(tnothread, tmulti_threads)

Expand Down