Skip to content

Commit

Permalink
feat: support for train transformers (#354)
Browse files Browse the repository at this point in the history
Introduce train transformers.
Note: train transformers are fitted first and then the preprocess
transformers are fitted next.


---------

Signed-off-by: s0nicboOm <[email protected]>
  • Loading branch information
s0nicboOm authored Mar 29, 2024
1 parent 377ada2 commit db2cc4f
Show file tree
Hide file tree
Showing 10 changed files with 12,934 additions and 465 deletions.
3 changes: 1 addition & 2 deletions numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ class TrainerConf:
retry_sec: int = 600 # 10 min
batch_size: int = 64
data_freq_sec: int = 60
# TODO: Support trainer based transform models
max_value_map: Optional[dict[str, float]] = None
transforms: Optional[list[ModelInfo]] = None
pltrainer_conf: LightningTrainerConf = field(default_factory=LightningTrainerConf)


Expand Down
28 changes: 14 additions & 14 deletions numalogic/transforms/_stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# limitations under the License.

from collections.abc import Sequence
from typing import Union, Optional
from typing import Optional, Union

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -78,27 +78,27 @@ def __init__(
lower: Optional[Union[float, Sequence[float]]] = None,
upper: Optional[Union[float, Sequence[float]]] = None,
):
self._validate_args(lower, upper)
self.lower = lower
self.upper = upper
self.lower, self.upper = self._validate_args(lower, upper)

@staticmethod
def _validate_args(
lower: Union[float, Sequence[float]], upper: Union[float, Sequence[float]]
) -> None:
lower: Optional[Union[float, Sequence[float]]],
upper: Optional[Union[float, Sequence[float]]],
) -> Optional[tuple[Optional[Union[float, npt.NDArray]], Optional[Union[float, npt.NDArray]]]]:
if lower is None and upper is None:
raise ValueError("At least one of lower or upper should be provided.")

if isinstance(lower, Sequence) and isinstance(upper, Sequence) and len(lower) != len(upper):
raise ValueError("lower and upper should have the same length.")
if isinstance(lower, Sequence) and isinstance(upper, Sequence):
if len(lower) != len(upper):
raise ValueError("lower and upper should have the same length.")
lower, upper = np.asarray(lower, dtype=np.float32), np.asarray(upper, dtype=np.float32)
if upper is not None and lower is not None and np.any(lower > upper):
raise ValueError("lower value should be less than or equal to upper value")
return lower, upper

def transform(self, x: npt.NDArray[float], **__) -> npt.NDArray[float]:
_df = pd.DataFrame(x, dtype=np.float32)
if (self.lower is not None) and (self.upper is not None):
return _df.clip(lower=self.lower, upper=self.upper, axis=1).to_numpy(dtype=np.float32)
if self.upper is not None:
return _df.clip(upper=self.upper, axis=1).to_numpy(dtype=np.float32)
return _df.clip(lower=self.lower, axis=1).to_numpy(dtype=np.float32)
_df = _df.clip(upper=self.upper, lower=self.lower, axis=1)
return _df.to_numpy(dtype=np.float32)


class GaussianNoiseAdder(StatelessTransformer):
Expand Down
38 changes: 18 additions & 20 deletions numalogic/udfs/trainer/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
from torch.utils.data import DataLoader

from numalogic.config import PreprocessFactory, ModelFactory, ThresholdFactory, RegistryFactory
from numalogic.config._config import NumalogicConf
from numalogic.config._config import NumalogicConf, ModelInfo
from numalogic.models.autoencoder import TimeseriesTrainer
from numalogic.tools.data import StreamingDataset
from numalogic.tools.exceptions import ConfigNotFoundError, RedisRegistryError
from numalogic.tools.types import redis_client_t, artifact_t, KEYS, KeyedArtifact
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf, MLPipelineConf
from numalogic.udfs._config import PipelineConf
from numalogic.udfs._metrics import (
REDIS_ERROR_COUNTER,
INSUFFICIENT_DATA_COUNTER,
Expand Down Expand Up @@ -76,6 +76,7 @@ def compute(
model: artifact_t,
input_: npt.NDArray[float],
preproc_clf: Optional[artifact_t] = None,
trainer_transform: Optional[artifact_t] = None,
threshold_clf: Optional[artifact_t] = None,
numalogic_cfg: Optional[NumalogicConf] = None,
) -> dict[str, KeyedArtifact]:
Expand All @@ -86,6 +87,7 @@ def compute(
model: Model artifact
input_: Input data
preproc_clf: Preprocessing artifact
trainer_transform: trainer specific preprocessing artifacts
threshold_clf: Thresholding artifact
numalogic_cfg: Numalogic configuration
Expand All @@ -101,6 +103,9 @@ def compute(
raise ConfigNotFoundError("Numalogic Trainer config not found!")
dict_artifacts = {}
trainer_cfg = numalogic_cfg.trainer
if trainer_transform:
input_ = trainer_transform.fit_transform(input_)

if preproc_clf:
input_ = preproc_clf.fit_transform(input_)
dict_artifacts["preproc_clf"] = KeyedArtifact(
Expand All @@ -123,6 +128,7 @@ def compute(

if threshold_clf:
threshold_clf.fit(train_reconerr)
_LOGGER.info("Fit data using threshold model")
dict_artifacts["threshold_clf"] = KeyedArtifact(
dkeys=[numalogic_cfg.threshold.name],
artifact=threshold_clf,
Expand Down Expand Up @@ -218,9 +224,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_LOGGER.info("%s - Data fetched, shape: %s", payload.uuid, df.shape)

# Construct feature array
x_train, nan_counter, inf_counter = self.get_feature_arr(
df, _conf.metrics, max_value_map=_conf.numalogic_conf.trainer.max_value_map
)
x_train, nan_counter, inf_counter = self.get_feature_arr(df, _conf.metrics)
_add_summary(
summary=NAN_SUMMARY,
labels=_metric_label_values,
Expand All @@ -233,15 +237,17 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)

# Initialize artifacts
preproc_clf = self._construct_preproc_clf(_conf)
preproc_clf = self._construct_clf(_conf.numalogic_conf.preprocess)
trainer_transform = self._construct_clf(_conf.numalogic_conf.trainer.transforms)
model = self._model_factory.get_instance(_conf.numalogic_conf.model)
thresh_clf = self._thresh_factory.get_instance(_conf.numalogic_conf.threshold)

# Train artifacts
dict_artifacts = self.compute(
model,
x_train,
model=model,
input_=x_train,
preproc_clf=preproc_clf,
trainer_transform=trainer_transform,
threshold_clf=thresh_clf,
numalogic_cfg=_conf.numalogic_conf,
)
Expand Down Expand Up @@ -274,9 +280,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
return Messages(Message.to_drop())

def _construct_preproc_clf(self, _conf: MLPipelineConf) -> Optional[artifact_t]:
def _construct_clf(self, _conf: Optional[list[ModelInfo]]) -> Optional[artifact_t]:
preproc_clfs = []
for _cfg in _conf.numalogic_conf.preprocess:
if not _conf:
return None
for _cfg in _conf:
_clf = self._preproc_factory.get_instance(_cfg)
preproc_clfs.append(_clf)
if not preproc_clfs:
Expand Down Expand Up @@ -348,7 +356,6 @@ def get_feature_arr(
raw_df: pd.DataFrame,
metrics: list[str],
fill_value: float = 0.0,
max_value_map: Optional[dict[str, float]] = None,
) -> tuple[npt.NDArray[float], float, float]:
"""
Get feature array from the raw dataframe.
Expand All @@ -369,16 +376,7 @@ def get_feature_arr(
if col not in raw_df.columns:
raw_df[col] = fill_value
nan_counter += len(raw_df)

feat_df = raw_df[metrics]
if max_value_map:
max_value_list = [max_value_map.get(col, np.nan) for col in metrics]
feat_df.clip(upper=max_value_list, inplace=True)
_LOGGER.info(
"Replaced %s with max_value_map from the map with value of %s.",
metrics,
max_value_list,
)
nan_counter += raw_df.isna().sum().all()
inf_counter = np.isinf(feat_df).sum().all()
feat_df = feat_df.fillna(fill_value).replace([np.inf, -np.inf], fill_value)
Expand Down
Loading

0 comments on commit db2cc4f

Please sign in to comment.