Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instantiate the thread pool from the outermost layers #234

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ To set up for local development run this command from the root of the repo.

1. Create a file `src/datadoc/.env`
1. Place the following lines in the file:
```
DATADOC_DASH_DEVELOPMENT_MODE=True
DATADOC_LOG_LEVEL=debug
```

```env
DATADOC_DASH_DEVELOPMENT_MODE=True
DATADOC_LOG_LEVEL=debug
```

To see all configuration options, see `src/datadoc/config.py`

Expand Down
47 changes: 30 additions & 17 deletions src/datadoc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

import concurrent
import logging
from pathlib import Path

Expand Down Expand Up @@ -76,10 +77,13 @@ def build_app(app: type[Dash]) -> Dash:
return app


def get_app(dataset_path: str | None = None) -> tuple[Dash, int]:
def get_app(
executor: concurrent.futures.ThreadPoolExecutor,
dataset_path: str | None = None,
) -> tuple[Dash, int]:
"""Centralize all the ugliness around initializing the app."""
logger.info("Datadoc version v%s", get_app_version())
collect_data_from_external_sources()
collect_data_from_external_sources(executor)
state.current_metadata_language = SupportedLanguages.NORSK_BOKMÅL
state.metadata = DataDocMetadata(
state.statistic_subject_mapping,
Expand Down Expand Up @@ -114,43 +118,52 @@ def get_app(dataset_path: str | None = None) -> tuple[Dash, int]:
return app, port


def collect_data_from_external_sources() -> None:
def collect_data_from_external_sources(
executor: concurrent.futures.ThreadPoolExecutor,
) -> None:
"""Call classes and methods which collect data from external sources.

Must be non-blocking to prevent delays in app startup.
"""
logger.debug("Start threads - Collecting data from external sources")
state.statistic_subject_mapping = StatisticSubjectMapping(
executor,
config.get_statistical_subject_source_url(),
)

state.unit_types = CodeList(
executor,
config.get_unit_code(),
)

state.organisational_units = CodeList(
executor,
config.get_organisational_unit_code(),
)
logger.debug("Finished blocking - Collecting data from external sources")


def main(dataset_path: str | None = None) -> None:
"""Entrypoint when running as a script."""
if dataset_path:
logger.info("Starting app with dataset_path = %s", dataset_path)
app, port = get_app(dataset_path)
if running_in_notebook():
logger.info("Running in notebook")
app.run(
jupyter_mode="tab",
jupyter_server_url=config.get_jupyterhub_http_referrer(),
jupyter_height=1000,
port=port,
)
else:
if dev_mode := config.get_dash_development_mode():
logger.warning(
"Starting in Development Mode. NOT SUITABLE FOR PRODUCTION.",

with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
app, port = get_app(executor, dataset_path)
if running_in_notebook():
logger.info("Running in notebook")
app.run(
jupyter_mode="tab",
jupyter_server_url=config.get_jupyterhub_http_referrer(),
jupyter_height=1000,
port=port,
)
app.run(debug=dev_mode, port=port)
else:
if dev_mode := config.get_dash_development_mode():
logger.warning(
"Starting in Development Mode. NOT SUITABLE FOR PRODUCTION.",
)
app.run(debug=dev_mode, port=port)


if __name__ == "__main__":
Expand Down
10 changes: 8 additions & 2 deletions src/datadoc/backend/code_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from datadoc.enums import SupportedLanguages

if TYPE_CHECKING:
import concurrent

import pandas as pd

from klass.classes.classification import KlassClassification
Expand Down Expand Up @@ -51,7 +53,11 @@ def get_title(self, language: SupportedLanguages) -> str:
class CodeList(GetExternalSource):
"""Class for retrieving classifications from Klass."""

def __init__(self, classification_id: int | None) -> None:
def __init__(
self,
executor: concurrent.futures.ThreadPoolExecutor,
classification_id: int | None,
) -> None:
"""Retrieves a list of classifications given a classification id.

Initializes the classifications list and starts fetching the classifications.
Expand All @@ -63,7 +69,7 @@ def __init__(self, classification_id: int | None) -> None:
self._classifications: list[CodeListItem] = []
self.classification_id = classification_id
self.classifications_dataframes: dict[str, pd.DataFrame] | None = None
super().__init__()
super().__init__(executor)

def _fetch_data_from_external_source(
self,
Expand Down
15 changes: 8 additions & 7 deletions src/datadoc/backend/external_sources/external_sources.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations

import concurrent.futures
import logging
from abc import ABC
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import Generic
from typing import TypeVar

if TYPE_CHECKING:
import concurrent.futures

logger = logging.getLogger(__name__)

T = TypeVar("T")
Expand All @@ -15,16 +18,14 @@
class GetExternalSource(ABC, Generic[T]):
"""Abstract base class for getting data from external sources."""

def __init__(self) -> None:
def __init__(self, executor: concurrent.futures.ThreadPoolExecutor) -> None:
"""Retrieves data from an external source asynchronously.

Initializes the future object.
"""
self.future: concurrent.futures.Future[T | None] | None = None
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
self.future = executor.submit(
self._fetch_data_from_external_source,
)
self.future = executor.submit(
self._fetch_data_from_external_source,
)

def wait_for_external_result(self) -> None:
"""Waits for the thread responsible for loading the external request to finish."""
Expand Down
12 changes: 10 additions & 2 deletions src/datadoc/backend/statistic_subject_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING

import bs4
import requests
Expand All @@ -11,6 +12,9 @@
from datadoc.backend.external_sources.external_sources import GetExternalSource
from datadoc.enums import SupportedLanguages

if TYPE_CHECKING:
import concurrent

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -62,7 +66,11 @@ class PrimarySubject(Subject):
class StatisticSubjectMapping(GetExternalSource):
"""Allow mapping between statistic short name and primary and secondary subject."""

def __init__(self, source_url: str | None) -> None:
def __init__(
self,
executor: concurrent.futures.ThreadPoolExecutor,
source_url: str | None,
) -> None:
"""Retrieves the statistical structure document from the given URL.

Initializes the mapping dicts. Based on the values in the statistical structure document.
Expand All @@ -73,7 +81,7 @@ def __init__(self, source_url: str | None) -> None:

self._primary_subjects: list[PrimarySubject] = []

super().__init__()
super().__init__(executor)

def get_secondary_subject(self, statistic_short_name: str | None) -> str | None:
"""Looks up the secondary subject for the given statistic short name in the mapping dict.
Expand Down
7 changes: 5 additions & 2 deletions src/datadoc/wsgi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Entrypoint for Gunicorn."""

import concurrent

from .app import get_app

datadoc_app, _ = get_app()
server = datadoc_app.server
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
datadoc_app, _ = get_app(executor)
server = datadoc_app.server
4 changes: 2 additions & 2 deletions tests/backend/test_code_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_read_dataframe(
assert code_list_fake_structure.classifications == expected


def test_non_existent_code():
code_list = CodeList(0)
def test_non_existent_code(thread_pool_executor):
code_list = CodeList(thread_pool_executor, 0)
code_list.wait_for_external_result()
assert code_list.classifications == []
6 changes: 5 additions & 1 deletion tests/backend/test_datadoc_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,13 @@ def test_dataset_status_default_value(
def test_dataset_assessment_default_value(
expected_type: Assessment | None,
copy_dataset_to_path: Path,
thread_pool_executor,
):
datadoc_metadata = DataDocMetadata(
statistic_subject_mapping=StatisticSubjectMapping(source_url=""),
statistic_subject_mapping=StatisticSubjectMapping(
thread_pool_executor,
source_url="",
),
dataset_path=str(copy_dataset_to_path),
)
assert datadoc_metadata.dataset.assessment == expected_type
Expand Down
7 changes: 4 additions & 3 deletions tests/backend/test_statistic_subject_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from tests.utils import TEST_RESOURCES_DIRECTORY


def test_no_source_url():
subject_mapping = StatisticSubjectMapping(None)
def test_no_source_url(thread_pool_executor):
subject_mapping = StatisticSubjectMapping(thread_pool_executor, None)
subject_mapping.wait_for_external_result()
assert subject_mapping.primary_subjects == []

Expand Down Expand Up @@ -151,12 +151,13 @@ def test_get_secondary_subject(
def subject_mapping_http_exception(
requests_mock,
exception_to_raise,
thread_pool_executor,
) -> StatisticSubjectMapping:
requests_mock.get(
"http://test.some.url.com",
exc=exception_to_raise,
)
return StatisticSubjectMapping("http://test.some.url.com")
return StatisticSubjectMapping(thread_pool_executor, "http://test.some.url.com")


@pytest.mark.parametrize(
Expand Down
18 changes: 12 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import concurrent
import copy
import functools
import os
Expand Down Expand Up @@ -169,11 +170,17 @@ def subject_xml_file_path() -> pathlib.Path:
)


@pytest.fixture()
def thread_pool_executor() -> concurrent.futures.ThreadPoolExecutor:
return concurrent.futures.ThreadPoolExecutor(max_workers=12)


@pytest.fixture()
def subject_mapping_fake_statistical_structure(
_mock_fetch_statistical_structure,
thread_pool_executor,
) -> StatisticSubjectMapping:
return StatisticSubjectMapping("placeholder")
return StatisticSubjectMapping(thread_pool_executor, "placeholder")


@pytest.fixture()
Expand All @@ -195,12 +202,13 @@ def fake_statistical_structure() -> ResultSet:
def subject_mapping_http_exception(
requests_mock,
exception_to_raise,
thread_pool_executor,
) -> StatisticSubjectMapping:
requests_mock.get(
"http://test.some.url.com",
exc=exception_to_raise,
)
return StatisticSubjectMapping("http://test.some.url.com")
return StatisticSubjectMapping(thread_pool_executor, "http://test.some.url.com")


@pytest.fixture()
Expand Down Expand Up @@ -239,10 +247,8 @@ def fake_code_list() -> dict[str, pd.DataFrame]:


@pytest.fixture()
def code_list_fake_structure(
_mock_fetch_dataframe,
) -> CodeList:
return CodeList(100)
def code_list_fake_structure(_mock_fetch_dataframe, thread_pool_executor) -> CodeList:
return CodeList(thread_pool_executor, 100)


@pytest.fixture()
Expand Down
8 changes: 6 additions & 2 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
from datadoc.app import get_app


def test_get_app(subject_mapping_fake_statistical_structure, code_list_fake_structure):
def test_get_app(
subject_mapping_fake_statistical_structure,
code_list_fake_structure,
thread_pool_executor,
):
state.statistic_subject_mapping = subject_mapping_fake_statistical_structure
state.code_list = code_list_fake_structure

app, _ = get_app()
app, _ = get_app(thread_pool_executor)
assert app.config["name"] == "Datadoc"
assert len(app.callback_map.items()) > 0