Skip to content

Commit

Permalink
update to python 3.10 (#20)
Browse files Browse the repository at this point in the history
* update to Python 3.10

* update

* remove loss_fn and updates deps

* enhance label distribution output

* attention_mask automatic construction

* scheduler init

* make counter and idx distinct in the tracker

* remove print from tracker

* update model checkpoint name

* make tokenizer init optional

* avoid configuring model twice
  • Loading branch information
pietrolesci authored Mar 13, 2024
1 parent c251104 commit 11986fe
Show file tree
Hide file tree
Showing 38 changed files with 2,269 additions and 3,767 deletions.
74 changes: 37 additions & 37 deletions energizer/active_learning/datastores/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from math import floor
from pathlib import Path
from typing import Literal, Optional, Union
from typing import Literal

import numpy as np
import pandas as pd
Expand All @@ -16,15 +16,15 @@

class ActiveLearningMixin(ABC):
@abstractmethod
def pool_size(self, round: Optional[int] = None) -> int:
def pool_size(self, round: int | None = None) -> int:
...

@abstractmethod
def labelled_size(self, round: Optional[int] = None) -> int:
def labelled_size(self, round: int | None = None) -> int:
...

@abstractmethod
def query_size(self, round: Optional[int] = None) -> int:
def query_size(self, round: int | None = None) -> int:
...

@abstractmethod
Expand All @@ -36,43 +36,43 @@ def label(
self,
indices: list[int],
round: int,
validation_perc: Optional[float] = None,
validation_sampling: Optional[Literal["uniform", "stratified"]] = "uniform",
validation_perc: float | None = None,
validation_sampling: Literal["uniform", "stratified"] | None = "uniform",
) -> int:
...

@abstractmethod
def sample_from_pool(
self,
size: int,
round: Optional[int] = None,
random_state: Optional[RandomState] = None,
with_indices: Optional[list[int]] = None,
round: int | None = None,
random_state: RandomState | None = None,
with_indices: list[int] | None = None,
**kwargs,
) -> list[int]:
...

@abstractmethod
def save_labelled_dataset(self, save_dir: Union[str, Path]) -> None:
def save_labelled_dataset(self, save_dir: str | Path) -> None:
...

def pool_loader(self, *args, **kwargs) -> Optional[DataLoader]:
def pool_loader(self, *args, **kwargs) -> DataLoader | None:
return self.get_loader(RunningStage.POOL, *args, **kwargs) # type: ignore

@abstractmethod
def reset(self) -> None:
...

@abstractmethod
def get_train_ids(self, round: Optional[int] = None) -> list[int]:
def get_train_ids(self, round: int | None = None) -> list[int]:
...

@abstractmethod
def get_validation_ids(self, round: Optional[int] = None) -> list[int]:
def get_validation_ids(self, round: int | None = None) -> list[int]:
...

@abstractmethod
def get_pool_ids(self, round: Optional[int] = None) -> list[int]:
def get_pool_ids(self, round: int | None = None) -> list[int]:
...


Expand All @@ -82,23 +82,23 @@ class ActiveDatastore(ActiveLearningMixin, Datastore):

class ActivePandasDatastore(ActiveLearningMixin, PandasDatastore):
_train_data: pd.DataFrame
_test_data: Optional[Dataset]
_test_data: Dataset | None

def train_size(self, round: Optional[int] = None) -> int:
def train_size(self, round: int | None = None) -> int:
return self._train_mask(round).sum()

def validation_size(self, round: Optional[int] = None) -> int:
def validation_size(self, round: int | None = None) -> int:
if self._validation_data is not None:
return len(self._validation_data)
return self._validation_mask(round).sum()

def pool_size(self, round: Optional[int] = None) -> int:
def pool_size(self, round: int | None = None) -> int:
return self._pool_mask(round).sum()

def labelled_size(self, round: Optional[int] = None) -> int:
def labelled_size(self, round: int | None = None) -> int:
return self._labelled_mask(round).sum()

def query_size(self, round: Optional[int] = None) -> int:
def query_size(self, round: int | None = None) -> int:
last_round = round or self._train_data[SpecialKeys.LABELLING_ROUND].max()
if last_round < 0:
return self.labelled_size(last_round)
Expand All @@ -108,8 +108,8 @@ def total_rounds(self) -> int:
return self._train_data[SpecialKeys.LABELLING_ROUND].max()

def train_dataset(
self, round: Optional[int] = None, passive: Optional[bool] = False, with_indices: Optional[list[int]] = None
) -> Optional[Dataset]:
self, round: int | None = None, passive: bool | None = False, with_indices: list[int] | None = None
) -> Dataset | None:
if passive:
return super().train_dataset()

Expand All @@ -119,15 +119,15 @@ def train_dataset(
mask = mask & self._train_data[SpecialKeys.ID].isin(with_indices)
return Dataset.from_pandas(self._train_data.loc[mask], preserve_index=False)

def validation_dataset(self, round: Optional[int] = None) -> Optional[Dataset]:
def validation_dataset(self, round: int | None = None) -> Dataset | None:
if self._validation_data is not None:
return self._validation_data

mask = self._validation_mask(round)
if mask.sum() > 0:
return Dataset.from_pandas(self._train_data.loc[mask], preserve_index=False)

def pool_dataset(self, round: Optional[int] = None, with_indices: Optional[list[int]] = None) -> Optional[Dataset]:
def pool_dataset(self, round: int | None = None, with_indices: list[int] | None = None) -> Dataset | None:
mask = self._pool_mask(round)
if with_indices is not None:
mask = mask & self._train_data[SpecialKeys.ID].isin(with_indices)
Expand All @@ -139,7 +139,7 @@ def label(
self,
indices: list[int],
round: int,
validation_perc: Optional[float] = None,
validation_perc: float | None = None,
validation_sampling: Literal["uniform", "stratified"] = "uniform",
) -> int:
assert isinstance(indices, list), ValueError(f"`indices` must be of type `List[int]`, not {type(indices)}.")
Expand Down Expand Up @@ -170,9 +170,9 @@ def label(
def sample_from_pool(
self,
size: int,
round: Optional[int] = None,
random_state: Optional[RandomState] = None,
with_indices: Optional[list[int]] = None,
round: int | None = None,
random_state: RandomState | None = None,
with_indices: list[int] | None = None,
**kwargs,
) -> list[int]:
"""Performs `uniform` or `stratified` sampling from the pool."""
Expand All @@ -190,7 +190,7 @@ def sample_from_pool(
**kwargs,
)

def save_labelled_dataset(self, save_dir: Union[str, Path]) -> None:
def save_labelled_dataset(self, save_dir: str | Path) -> None:
path = Path(save_dir)
path.mkdir(parents=True, exist_ok=True)
self._train_data.loc[self._labelled_mask()].to_parquet(path / "labelled_dataset.parquet", index=False)
Expand All @@ -199,31 +199,31 @@ def save_labelled_dataset(self, save_dir: Union[str, Path]) -> None:
Utilities
"""

def _labelled_mask(self, round: Optional[int] = None) -> pd.Series:
def _labelled_mask(self, round: int | None = None) -> pd.Series:
mask = self._train_data[SpecialKeys.IS_LABELLED] == True # noqa: E712
if round is not None:
mask = mask & (self._train_data[SpecialKeys.LABELLING_ROUND] <= round)
return mask

def _train_mask(self, round: Optional[int] = None) -> pd.Series:
def _train_mask(self, round: int | None = None) -> pd.Series:
return self._labelled_mask(round) & (self._train_data[SpecialKeys.IS_VALIDATION] == False) # noqa: E712

def _validation_mask(self, round: Optional[int] = None) -> pd.Series:
def _validation_mask(self, round: int | None = None) -> pd.Series:
return self._labelled_mask(round) & (self._train_data[SpecialKeys.IS_VALIDATION] == True) # noqa: E712

def _pool_mask(self, round: Optional[int] = None) -> pd.Series:
def _pool_mask(self, round: int | None = None) -> pd.Series:
mask = self._train_data[SpecialKeys.IS_LABELLED] == False # noqa: E712
if round is not None:
mask = mask | (self._train_data[SpecialKeys.LABELLING_ROUND] > round)
return mask

def get_train_ids(self, round: Optional[int] = None) -> list[int]:
def get_train_ids(self, round: int | None = None) -> list[int]:
return self._train_data.loc[self._train_mask(round), SpecialKeys.ID].tolist()

def get_validation_ids(self, round: Optional[int] = None) -> list[int]:
def get_validation_ids(self, round: int | None = None) -> list[int]:
return self._train_data.loc[self._validation_mask(round), SpecialKeys.ID].tolist()

def get_pool_ids(self, round: Optional[int] = None) -> list[int]:
def get_pool_ids(self, round: int | None = None) -> list[int]:
return self._train_data.loc[self._pool_mask(round), SpecialKeys.ID].tolist()


Expand Down Expand Up @@ -258,7 +258,7 @@ def label(
self,
indices: list[int],
round: int,
validation_perc: Optional[float] = None,
validation_perc: float | None = None,
validation_sampling: Literal["uniform", "stratified"] = "uniform",
) -> int:
n_labelled = super().label(indices, round, validation_perc, validation_sampling)
Expand Down
14 changes: 6 additions & 8 deletions energizer/active_learning/datastores/classification.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Optional

from datasets import Dataset
from transformers import PreTrainedTokenizerBase
from typing_extensions import Self
Expand All @@ -14,12 +12,12 @@ class ActivePandasDatastoreForSequenceClassification(SequenceClassificationMixin
def from_datasets(
cls,
tokenizer: PreTrainedTokenizerBase,
uid_name: Optional[str] = None,
on_cpu: Optional[list[str]] = None,
seed: Optional[int] = 42,
train_dataset: Optional[Dataset] = None,
validation_dataset: Optional[Dataset] = None,
test_dataset: Optional[Dataset] = None,
uid_name: str | None = None,
on_cpu: list[str] | None = None,
seed: int | None = 42,
train_dataset: Dataset | None = None,
validation_dataset: Dataset | None = None,
test_dataset: Dataset | None = None,
) -> Self:
obj = cls(seed) # type: ignore
obj = _from_datasets(
Expand Down
78 changes: 33 additions & 45 deletions energizer/active_learning/strategies/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from abc import ABC, abstractmethod
from collections.abc import Callable
from pathlib import Path
from typing import Any, Literal, Optional, Union
from typing import Any, Literal

import numpy as np
import torch
Expand Down Expand Up @@ -34,29 +33,29 @@ def active_fit(
self,
datastore: ActiveDatastore,
query_size: int,
max_rounds: Optional[int] = None,
max_budget: Optional[int] = None,
validation_perc: Optional[float] = None,
max_rounds: int | None = None,
max_budget: int | None = None,
validation_perc: float | None = None,
validation_sampling: Literal["uniform", "stratified"] = "uniform",
reinit_model: bool = True,
model_cache_dir: Union[str, Path] = ".model_cache",
max_epochs: Optional[int] = 3,
min_epochs: Optional[int] = None,
max_steps: Optional[int] = None,
min_steps: Optional[int] = None,
validation_freq: Optional[str] = "1:epoch",
gradient_accumulation_steps: Optional[int] = None,
learning_rate: Optional[float] = None,
optimizer: Optional[str] = None,
optimizer_kwargs: Optional[Union[dict, OptimizationArgs]] = None,
scheduler: Optional[str] = None,
scheduler_kwargs: Optional[Union[dict, SchedulerArgs]] = None,
model_cache_dir: str | Path = ".model_cache",
max_epochs: int | None = 3,
min_epochs: int | None = None,
max_steps: int | None = None,
min_steps: int | None = None,
validation_freq: str | None = "1:epoch",
gradient_accumulation_steps: int | None = None,
learning_rate: float | None = None,
optimizer: str | None = None,
optimizer_kwargs: dict | OptimizationArgs | None = None,
scheduler: str | None = None,
scheduler_kwargs: dict | SchedulerArgs | None = None,
log_interval: int = 1,
enable_progress_bar: bool = True,
limit_train_batches: Optional[int] = None,
limit_validation_batches: Optional[int] = None,
limit_test_batches: Optional[int] = None,
limit_pool_batches: Optional[int] = None,
limit_train_batches: int | None = None,
limit_validation_batches: int | None = None,
limit_test_batches: int | None = None,
limit_pool_batches: int | None = None,
) -> Any:
assert not reinit_model or (
reinit_model and model_cache_dir
Expand Down Expand Up @@ -103,7 +102,7 @@ def active_fit(
)

def run_active_fit(
self, datastore: ActiveDatastore, reinit_model: bool, model_cache_dir: Union[str, Path], **kwargs
self, datastore: ActiveDatastore, reinit_model: bool, model_cache_dir: str | Path, **kwargs
) -> Any:
if reinit_model:
self.save_state_dict(model_cache_dir)
Expand Down Expand Up @@ -236,10 +235,10 @@ def _setup_round(
) -> tuple[
_FabricModule,
_FabricOptimizer,
Optional[_LRScheduler],
Optional[_FabricDataLoader],
Optional[_FabricDataLoader],
Optional[_FabricDataLoader],
_LRScheduler | None,
_FabricDataLoader | None,
_FabricDataLoader | None,
_FabricDataLoader | None,
]:
"""Start progress tracking."""

Expand Down Expand Up @@ -277,20 +276,14 @@ def run_pool_evaluation(self, model: _FabricModule, loader: _FabricDataLoader) -
return {k: np.concatenate(v) for k, v in _out.items()}

def evaluation_step(
self,
model: _FabricModule,
batch: Any,
batch_idx: int,
loss_fn: Optional[Union[torch.nn.Module, Callable]],
metrics: Optional[METRIC],
stage: Union[str, RunningStage],
) -> Union[dict, BATCH_OUTPUT]:
self, model: _FabricModule, batch: Any, batch_idx: int, metrics: METRIC | None, stage: str | RunningStage
) -> dict | BATCH_OUTPUT:
if stage != RunningStage.POOL:
return super().evaluation_step(model, batch, batch_idx, loss_fn, metrics, stage) # type: ignore
return super().evaluation_step(model, batch, batch_idx, metrics, stage) # type: ignore

# keep IDs here in case user messes up in the function definition
ids = batch[InputKeys.ON_CPU][SpecialKeys.ID]
pool_out = self.pool_step(model, batch, batch_idx, loss_fn, metrics)
pool_out = self.pool_step(model, batch, batch_idx, metrics)

assert isinstance(pool_out, torch.Tensor), f"`{stage}_step` must return a tensor`."

Expand All @@ -299,19 +292,14 @@ def evaluation_step(

@abstractmethod
def pool_step(
self,
model: _FabricModule,
batch: Any,
batch_idx: int,
loss_fn: Optional[Union[torch.nn.Module, Callable]],
metrics: Optional[METRIC] = None,
self, model: _FabricModule, batch: Any, batch_idx: int, metrics: METRIC | None = None
) -> torch.Tensor:
...

def pool_epoch_end(self, output: list[dict], metrics: Optional[METRIC]) -> list[dict]:
def pool_epoch_end(self, output: list[dict], metrics: METRIC | None) -> list[dict]:
return output

def get_pool_loader(self, datastore: ActiveDatastore, **kwargs) -> Optional[_FabricDataLoader]:
def get_pool_loader(self, datastore: ActiveDatastore, **kwargs) -> _FabricDataLoader | None:
subpool_ids = kwargs.get("subpool_ids", None)
loader = datastore.pool_loader(with_indices=subpool_ids) if subpool_ids is not None else datastore.pool_loader()

Expand All @@ -324,7 +312,7 @@ def get_pool_loader(self, datastore: ActiveDatastore, **kwargs) -> Optional[_Fab
)
return pool_loader

def get_train_loader(self, datastore: ActiveDatastore, **kwargs) -> Optional[_FabricDataLoader]:
def get_train_loader(self, datastore: ActiveDatastore, **kwargs) -> _FabricDataLoader | None:
# NOTE: hack -- load train dataloader with the evaluation batch size
assert datastore._loading_params is not None
batch_size = datastore.loading_params.batch_size
Expand Down
Loading

0 comments on commit 11986fe

Please sign in to comment.