Skip to content

Commit

Permalink
Instantiate the thread pool from the outermost layers (#234)
Browse files Browse the repository at this point in the history
* Instantiate the thread pool from the outermost layers

* Update tests
  • Loading branch information
mmwinther authored Mar 18, 2024
1 parent a00fbac commit 1d86e63
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 48 deletions.
9 changes: 5 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ To set up for local development run this command from the root of the repo.

1. Create a file `src/datadoc/.env`
1. Place the following lines in the file:
```
DATADOC_DASH_DEVELOPMENT_MODE=True
DATADOC_LOG_LEVEL=debug
```

```env
DATADOC_DASH_DEVELOPMENT_MODE=True
DATADOC_LOG_LEVEL=debug
```
To see all configuration options, see `src/datadoc/config.py`
Expand Down
47 changes: 30 additions & 17 deletions src/datadoc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

import concurrent
import logging
from pathlib import Path

Expand Down Expand Up @@ -76,10 +77,13 @@ def build_app(app: type[Dash]) -> Dash:
return app


def get_app(dataset_path: str | None = None) -> tuple[Dash, int]:
def get_app(
executor: concurrent.futures.ThreadPoolExecutor,
dataset_path: str | None = None,
) -> tuple[Dash, int]:
"""Centralize all the ugliness around initializing the app."""
logger.info("Datadoc version v%s", get_app_version())
collect_data_from_external_sources()
collect_data_from_external_sources(executor)
state.current_metadata_language = SupportedLanguages.NORSK_BOKMÅL
state.metadata = DataDocMetadata(
state.statistic_subject_mapping,
Expand Down Expand Up @@ -114,43 +118,52 @@ def get_app(dataset_path: str | None = None) -> tuple[Dash, int]:
return app, port


def collect_data_from_external_sources() -> None:
def collect_data_from_external_sources(
executor: concurrent.futures.ThreadPoolExecutor,
) -> None:
"""Call classes and methods which collect data from external sources.
Must be non-blocking to prevent delays in app startup.
"""
logger.debug("Start threads - Collecting data from external sources")
state.statistic_subject_mapping = StatisticSubjectMapping(
executor,
config.get_statistical_subject_source_url(),
)

state.unit_types = CodeList(
executor,
config.get_unit_code(),
)

state.organisational_units = CodeList(
executor,
config.get_organisational_unit_code(),
)
logger.debug("Finished blocking - Collecting data from external sources")


def main(dataset_path: str | None = None) -> None:
"""Entrypoint when running as a script."""
if dataset_path:
logger.info("Starting app with dataset_path = %s", dataset_path)
app, port = get_app(dataset_path)
if running_in_notebook():
logger.info("Running in notebook")
app.run(
jupyter_mode="tab",
jupyter_server_url=config.get_jupyterhub_http_referrer(),
jupyter_height=1000,
port=port,
)
else:
if dev_mode := config.get_dash_development_mode():
logger.warning(
"Starting in Development Mode. NOT SUITABLE FOR PRODUCTION.",

with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
app, port = get_app(executor, dataset_path)
if running_in_notebook():
logger.info("Running in notebook")
app.run(
jupyter_mode="tab",
jupyter_server_url=config.get_jupyterhub_http_referrer(),
jupyter_height=1000,
port=port,
)
app.run(debug=dev_mode, port=port)
else:
if dev_mode := config.get_dash_development_mode():
logger.warning(
"Starting in Development Mode. NOT SUITABLE FOR PRODUCTION.",
)
app.run(debug=dev_mode, port=port)


if __name__ == "__main__":
Expand Down
10 changes: 8 additions & 2 deletions src/datadoc/backend/code_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from datadoc.enums import SupportedLanguages

if TYPE_CHECKING:
import concurrent

import pandas as pd

from klass.classes.classification import KlassClassification
Expand Down Expand Up @@ -51,7 +53,11 @@ def get_title(self, language: SupportedLanguages) -> str:
class CodeList(GetExternalSource):
"""Class for retrieving classifications from Klass."""

def __init__(self, classification_id: int | None) -> None:
def __init__(
self,
executor: concurrent.futures.ThreadPoolExecutor,
classification_id: int | None,
) -> None:
"""Retrieves a list of classifications given a classification id.
Initializes the classifications list and starts fetching the classifications.
Expand All @@ -63,7 +69,7 @@ def __init__(self, classification_id: int | None) -> None:
self._classifications: list[CodeListItem] = []
self.classification_id = classification_id
self.classifications_dataframes: dict[str, pd.DataFrame] | None = None
super().__init__()
super().__init__(executor)

def _fetch_data_from_external_source(
self,
Expand Down
15 changes: 8 additions & 7 deletions src/datadoc/backend/external_sources/external_sources.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations

import concurrent.futures
import logging
from abc import ABC
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import Generic
from typing import TypeVar

if TYPE_CHECKING:
import concurrent.futures

logger = logging.getLogger(__name__)

T = TypeVar("T")
Expand All @@ -15,16 +18,14 @@
class GetExternalSource(ABC, Generic[T]):
"""Abstract base class for getting data from external sources."""

def __init__(self) -> None:
def __init__(self, executor: concurrent.futures.ThreadPoolExecutor) -> None:
"""Retrieves data from an external source asynchronously.
Initializes the future object.
"""
self.future: concurrent.futures.Future[T | None] | None = None
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
self.future = executor.submit(
self._fetch_data_from_external_source,
)
self.future = executor.submit(
self._fetch_data_from_external_source,
)

def wait_for_external_result(self) -> None:
"""Waits for the thread responsible for loading the external request to finish."""
Expand Down
12 changes: 10 additions & 2 deletions src/datadoc/backend/statistic_subject_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING

import bs4
import requests
Expand All @@ -11,6 +12,9 @@
from datadoc.backend.external_sources.external_sources import GetExternalSource
from datadoc.enums import SupportedLanguages

if TYPE_CHECKING:
import concurrent

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -62,7 +66,11 @@ class PrimarySubject(Subject):
class StatisticSubjectMapping(GetExternalSource):
"""Allow mapping between statistic short name and primary and secondary subject."""

def __init__(self, source_url: str | None) -> None:
def __init__(
self,
executor: concurrent.futures.ThreadPoolExecutor,
source_url: str | None,
) -> None:
"""Retrieves the statistical structure document from the given URL.
Initializes the mapping dicts. Based on the values in the statistical structure document.
Expand All @@ -73,7 +81,7 @@ def __init__(self, source_url: str | None) -> None:

self._primary_subjects: list[PrimarySubject] = []

super().__init__()
super().__init__(executor)

def get_secondary_subject(self, statistic_short_name: str | None) -> str | None:
"""Looks up the secondary subject for the given statistic short name in the mapping dict.
Expand Down
7 changes: 5 additions & 2 deletions src/datadoc/wsgi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Entrypoint for Gunicorn."""

import concurrent

from .app import get_app

datadoc_app, _ = get_app()
server = datadoc_app.server
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
datadoc_app, _ = get_app(executor)
server = datadoc_app.server
4 changes: 2 additions & 2 deletions tests/backend/test_code_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_read_dataframe(
assert code_list_fake_structure.classifications == expected


def test_non_existent_code():
code_list = CodeList(0)
def test_non_existent_code(thread_pool_executor):
code_list = CodeList(thread_pool_executor, 0)
code_list.wait_for_external_result()
assert code_list.classifications == []
6 changes: 5 additions & 1 deletion tests/backend/test_datadoc_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,13 @@ def test_dataset_status_default_value(
def test_dataset_assessment_default_value(
expected_type: Assessment | None,
copy_dataset_to_path: Path,
thread_pool_executor,
):
datadoc_metadata = DataDocMetadata(
statistic_subject_mapping=StatisticSubjectMapping(source_url=""),
statistic_subject_mapping=StatisticSubjectMapping(
thread_pool_executor,
source_url="",
),
dataset_path=str(copy_dataset_to_path),
)
assert datadoc_metadata.dataset.assessment == expected_type
Expand Down
7 changes: 4 additions & 3 deletions tests/backend/test_statistic_subject_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from tests.utils import TEST_RESOURCES_DIRECTORY


def test_no_source_url():
subject_mapping = StatisticSubjectMapping(None)
def test_no_source_url(thread_pool_executor):
subject_mapping = StatisticSubjectMapping(thread_pool_executor, None)
subject_mapping.wait_for_external_result()
assert subject_mapping.primary_subjects == []

Expand Down Expand Up @@ -151,12 +151,13 @@ def test_get_secondary_subject(
def subject_mapping_http_exception(
requests_mock,
exception_to_raise,
thread_pool_executor,
) -> StatisticSubjectMapping:
requests_mock.get(
"http://test.some.url.com",
exc=exception_to_raise,
)
return StatisticSubjectMapping("http://test.some.url.com")
return StatisticSubjectMapping(thread_pool_executor, "http://test.some.url.com")


@pytest.mark.parametrize(
Expand Down
18 changes: 12 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import concurrent
import copy
import functools
import os
Expand Down Expand Up @@ -169,11 +170,17 @@ def subject_xml_file_path() -> pathlib.Path:
)


@pytest.fixture()
def thread_pool_executor() -> concurrent.futures.ThreadPoolExecutor:
return concurrent.futures.ThreadPoolExecutor(max_workers=12)


@pytest.fixture()
def subject_mapping_fake_statistical_structure(
_mock_fetch_statistical_structure,
thread_pool_executor,
) -> StatisticSubjectMapping:
return StatisticSubjectMapping("placeholder")
return StatisticSubjectMapping(thread_pool_executor, "placeholder")


@pytest.fixture()
Expand All @@ -195,12 +202,13 @@ def fake_statistical_structure() -> ResultSet:
def subject_mapping_http_exception(
requests_mock,
exception_to_raise,
thread_pool_executor,
) -> StatisticSubjectMapping:
requests_mock.get(
"http://test.some.url.com",
exc=exception_to_raise,
)
return StatisticSubjectMapping("http://test.some.url.com")
return StatisticSubjectMapping(thread_pool_executor, "http://test.some.url.com")


@pytest.fixture()
Expand Down Expand Up @@ -239,10 +247,8 @@ def fake_code_list() -> dict[str, pd.DataFrame]:


@pytest.fixture()
def code_list_fake_structure(
_mock_fetch_dataframe,
) -> CodeList:
return CodeList(100)
def code_list_fake_structure(_mock_fetch_dataframe, thread_pool_executor) -> CodeList:
return CodeList(thread_pool_executor, 100)


@pytest.fixture()
Expand Down
8 changes: 6 additions & 2 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
from datadoc.app import get_app


def test_get_app(subject_mapping_fake_statistical_structure, code_list_fake_structure):
def test_get_app(
subject_mapping_fake_statistical_structure,
code_list_fake_structure,
thread_pool_executor,
):
state.statistic_subject_mapping = subject_mapping_fake_statistical_structure
state.code_list = code_list_fake_structure

app, _ = get_app()
app, _ = get_app(thread_pool_executor)
assert app.config["name"] == "Datadoc"
assert len(app.callback_map.items()) > 0

0 comments on commit 1d86e63

Please sign in to comment.