Skip to content

Commit

Permalink
More cleaned up combined.py and similarity.py
Browse files Browse the repository at this point in the history
added missing tests

Signed-off-by: Martin <[email protected]>
  • Loading branch information
bmmtstb committed Jan 20, 2024
1 parent e505b5e commit dd8ce33
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 78 deletions.
32 changes: 32 additions & 0 deletions configs/train_pose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: "Train Pose-Embeddings"
description: "Train the embeddings for a pose-based embedding generator using the dgs module."

device: "cuda"
print_prio: "normal"
is_training: on

train:
batch_size: 32
epochs: 1
loss: "dummy"
metric: "dummy"

test:
batch_size: 128

# Modules

dataset:
module_name: "PoseTrack21"

pose_embedding_generator:
module_name: "LinearPBEG" # see `dgs.models.embedding_generator`
embedding_size: 16
hidden_layers: []
joint_shape: (17, 2) # (J, kp_dim)
bbox_format: "XYWH"
bias: True
# only applicable for module "KeyPointConvolutionPBEG"
# hidden_layers_kp: None
# nof_kernels: 5

14 changes: 9 additions & 5 deletions dgs/models/embedding_generator/pose_based.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Different pose based embedding generators.
"""
from typing import Union

import torch
from torch import nn
from torchvision import tv_tensors
Expand All @@ -27,15 +29,17 @@
}


def set_up_hidden_layer_sizes(input_size: int, output_size: int, hidden_layers: list[int] | None = None) -> list[int]:
"""Given input and output size of an FC-NN, create a list of the sizes containing each hidden layer in the network.
def set_up_hidden_layer_sizes(
input_size: int, output_size: int, hidden_layers: Union[list[int], None] = None
) -> list[int]:
"""Given the input and output size of an FC-NN,
create a list of the sizes containing each hidden layer in the network.
There might be zero hidden layers.
Params:
input_size: The size of the input to the FC-Layers.
output_size: Output-size of the FC-Layers.
hidden_layers: A list containing the dimensions of each hidden layer in this network.
Default None means no hidden layers.
hidden_layers: The dimensionality of each hidden layer in this network. Default None means no hidden layers.
Returns:
The sizes of the hidden layers including input and output size.
Expand Down Expand Up @@ -156,7 +160,7 @@ def forward(self, *data, **kwargs) -> torch.Tensor:
This modules' prediction,
describing the key-points and bounding boxes as a tensor of shape ``[B x E]``.
"""
if len(data) < 2:
if len(data) != 2:
raise ValueError(f"Data should contain key points and bounding boxes, but has length {len(data)}.")
# extract key points and bboxes from data
kp, bboxes, *_args = data
Expand Down
17 changes: 7 additions & 10 deletions dgs/models/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from dgs.models.optimizer import get_optimizer, OPTIMIZERS
from dgs.models.states import DataSample
from dgs.utils.torchtools import save_checkpoint
from dgs.utils.types import Config, FilePath, NodePath, Validations
from dgs.utils.types import Config, FilePath, Validations

module_validations: Validations = {
"batch_size": ["int", ("gte", 1)],
Expand All @@ -35,15 +35,13 @@ class EngineModule(BaseModule):
Most of the settings are defined within the configuration file in the `training` section.
Methods:
train: Train the given `nn.Module`
test: Test the given `nn.Module`
run: First train the given `nn.Module`, then test it.
Notes:
The trained module is saved every epoch.
"""

# The engine is the heart of most algorithms and therefore contains a los of stuff.
# pylint: disable = too-many-instance-attributes, too-many-arguments

loss: nn.Module
metric: nn.Module
optimizer: optim.Optimizer
Expand All @@ -58,13 +56,12 @@ class EngineModule(BaseModule):
def __init__(
self,
config: Config,
path: NodePath,
test_loader: TorchDataLoader,
get_data: Callable[[DataSample], Union[torch.Tensor, tuple[torch.Tensor, ...]]],
get_target: Callable[[DataSample], Union[torch.Tensor, tuple[torch.Tensor, ...]]],
train_loader: TorchDataLoader = None,
):
super().__init__(config, path)
super().__init__(config, [])
self.validate_params(module_validations)

self.test_dl = test_loader
Expand Down Expand Up @@ -109,10 +106,10 @@ def test(self) -> any:
pred: torch.Tensor = torch.cat(pred)
target: torch.Tensor = torch.cat(target)

rank1, mAP = (1, 1) # todo create evaluation
rank1, m_ap = (1, 1) # todo create evaluation
# at the end use the writer to save results
self.writer.add_scalar(f"Test/{self.name}/rank1", rank1, self.curr_epoch)
self.writer.add_scalar(f"Test/{self.name}/mAP", mAP, self.curr_epoch)
self.writer.add_scalar(f"Test/{self.name}/mAP", m_ap, self.curr_epoch)

return rank1

Expand Down
18 changes: 11 additions & 7 deletions dgs/models/similarity/combined.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,29 @@ class DynamicallyGatedSimilarities(CombineSimilarityModule):
It is possible that :math:`S_1` and :math:`S_2` have different shapes in at least one dimension.
"""

@staticmethod
def forward(alpha: torch.FloatTensor, s1: torch.FloatTensor, s2: torch.FloatTensor) -> torch.FloatTensor:
def forward(self, *tensors, alpha: torch.FloatTensor = torch.tensor([0.5, 0.5]), **_kwargs) -> torch.FloatTensor:
"""The forward call of this module combines two weight matrices given a third importance weight :math:`\alpha`.
:math:`\alpha` describes how important s1 is, while :math:`(1- \alpha)` does the same for s2.
All tensors should be on the same device and ``s1`` and ``s2`` should have the same shape.
Args:
alpha (torch.FloatTensor):
Weight :math:`\alpha`. Should be a FloatTensor in range [0,1].
tensors (tuple[torch.FloatTensor, ...]): Two weight matrices as tuple of FloatTensors.
Both should have values in range [0,1] and be of the same shape ``[N x T]``.
alpha: Weight :math:`\alpha`. Should be a FloatTensor in range [0,1].
The shape of :math:`\alpha` can either be ``[]``, ``[1 (x 1)]``, or ``[N x 1]``.
s1 (torch.FloatTensor): A weight matrix as FloatTensor with values in range [0,1] of shape ``[N x T]``.
s2 (torch.FloatTensor): A weight matrix as FloatTensor with values in range [0,1] of shape ``[N x T]``.
Returns:
torch.FloatTensor: The weighted similarity matrix.
Raises:
ValueError: If alpha or the matrices have invalid shapes.
"""
if len(tensors) != 2:
raise ValueError(f"There should be exactly two matrices in the tensors argument, got {len(tensors)}")
if any(not isinstance(t, torch.Tensor) for t in tensors):
raise TypeError("All matrices should be torch (float) tensors.")
s1, s2 = tensors
if (a_max := torch.max(alpha)) > 1.0 or torch.min(alpha) < 0.0:
raise ValueError(f"alpha should lie in the range [0,1], but got [{torch.min(alpha)}, {a_max}]")

Expand Down Expand Up @@ -124,7 +127,8 @@ def forward(self, *tensors, **_kwargs) -> torch.FloatTensor:
raise TypeError("All the values in args should be tensors.")
if len(tensors) != len(self.alpha):
raise ValueError(
f"The length of the similarity matrices {len(tensors)} should equal the length of alpha {len(self.alpha)}"
f"The length of the similarity matrices {len(tensors)} "
f"should equal the length of alpha {len(self.alpha)}"
)
if len(tensors) > 1 and any(t.shape != tensors[0].shape for t in tensors):
raise ValueError("The shapes of every tensor should match.")
Expand Down
12 changes: 10 additions & 2 deletions dgs/models/similarity/similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@
from typing import Callable

import torch
from torch import nn
from torch.nn import CosineSimilarity, PairwiseDistance

from dgs.models.module import BaseModule
from dgs.utils.types import Config, NodePath


class SimilarityModule(BaseModule):
"""Abstract class for similarity functions"""
class SimilarityModule(BaseModule, nn.Module):
"""Abstract class for similarity functions.
TODO sizes / shapes? input and output?
"""

func: Callable[..., torch.Tensor]

def __init__(self, config: Config, path: NodePath):
BaseModule.__init__(self, config, path)
nn.Module.__init__(self)

def __call__(self, *args, **kwargs) -> any: # pragma: no cover
"""see self.forward()"""
return self.forward(*args, **kwargs)
Expand Down
4 changes: 1 addition & 3 deletions dgs/tracker_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def __init__(self, tracker_cfg: FilePath) -> None:
test_dataset: BaseDataset = module_loader(self.cfg, "dataset")
test_dl = get_data_loader(test_dataset, self.cfg["batch_size"])

self.engine: EngineModule = EngineModule(
self.cfg, path=["train", "test"], test_loader=test_dl, get_data=..., get_target=...
)
self.engine: EngineModule = EngineModule(self.cfg, test_loader=test_dl, get_data=..., get_target=...)

@enable_keyboard_interrupt
def run(self) -> None:
Expand Down
45 changes: 31 additions & 14 deletions tests/models/embedding_generator/test__combined.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_dgs_forward(self, device: Device):
(torch.tensor([[0.3]]), torch.ones((N, T)), torch.ones((N, T)), torch.ones((N, T))),
(torch.ones(N) * 0.3, torch.ones((N, T)), torch.ones((N, T)), torch.ones((N, T))),
(torch.ones((N, 1)) * 0.3, torch.ones((N, T)), torch.ones((N, T)), torch.ones((N, T))),
(torch.ones((1, 1, N, 1)) * 0.3, torch.ones((N, T)), torch.ones((N, T)), torch.ones((N, T))),
]:
with self.subTest(
msg="alpha: {}, s1: {}, s2: {}, result: {}, device: {}".format(alpha, s1, s2, result, device)
Expand All @@ -33,39 +34,55 @@ def test_dgs_forward(self, device: Device):
# send matrices to the respective device
self.assertTrue(
torch.allclose(
dgs.forward(alpha.to(device=device), s1.to(device=device), s2.to(device=device)),
dgs.forward(s1.to(device=device), s2.to(device=device), alpha=alpha.to(device=device)),
result.to(device=device),
)
)

def test_dgs_forward_raises(self):
N = 7
T = 21
for alpha, s1, s2, exception_type, msg in [
(torch.tensor([-0.1]), torch.ones((N, T)), torch.ones((N, T)), ValueError, "alpha should lie in the range"),
(torch.tensor([1.1]), torch.ones((N, T)), torch.ones((N, T)), ValueError, "alpha should lie in the range"),
(torch.ones((2, 2, 1)), torch.ones((N, T)), torch.ones((N, T)), ValueError, "alpha has the wrong shape"),
(torch.ones((2, 2)), torch.ones((N, T)), torch.ones((N, T)), ValueError, "If alpha is two dimensional"),
(torch.ones(1), torch.ones((N + 1, T)), torch.ones((N, T)), ValueError, "s1 and s2 should have the same"),
(torch.ones(1), torch.ones((N, T + 1)), torch.ones((N, T)), ValueError, "s1 and s2 should have the same"),
(torch.ones(1), torch.ones((N, T)), torch.ones((N + 1, T)), ValueError, "s1 and s2 should have the same"),
(torch.ones(1), torch.ones((N, T)), torch.ones((N, T + 1)), ValueError, "s1 and s2 should have the same"),
for alpha, sn, exception_type, msg in [
(
torch.tensor([-0.1]),
(torch.ones((N, T)), torch.ones((N, T))),
ValueError,
"alpha should lie in the range",
),
(
torch.tensor([1.1]),
(torch.ones((N, T)), torch.ones((N, T))),
ValueError,
"alpha should lie in the range",
),
(torch.ones((2, 2, 1)), (torch.ones((N, T)), torch.ones((N, T))), ValueError, "alpha has the wrong shape"),
(torch.ones((2, 2)), (torch.ones((N, T)), torch.ones((N, T))), ValueError, "If alpha is two dimensional"),
(torch.ones(1), (torch.ones((N + 1, T)), torch.ones((N, T))), ValueError, "s1 and s2 should have the same"),
(torch.ones(1), (torch.ones((N, T + 1)), torch.ones((N, T))), ValueError, "s1 and s2 should have the same"),
(torch.ones(1), (torch.ones((N, T)), torch.ones((N + 1, T))), ValueError, "s1 and s2 should have the same"),
(torch.ones(1), (torch.ones((N, T)), torch.ones((N, T + 1))), ValueError, "s1 and s2 should have the same"),
(torch.tensor([-0.1]), (np.ones((N, T)), torch.ones((N, T))), TypeError, "All matrices should be torch"),
(
torch.ones((N + 1, 1)).float(),
torch.ones((N, T)).float(),
torch.ones((N, T)).float(),
(torch.ones((N, T)).float(), torch.ones((N, T)).float()),
ValueError,
"the first dimension has to equal",
),
(
torch.tensor([-0.1]),
(torch.ones((N, T)), torch.ones((N, T)), torch.ones((N, T))),
ValueError,
"There should be exactly two matrices in the tensors argument",
),
]:
with self.subTest(
msg="alpha: {}, s1: {}, s2: {}, excp: {}, msg: {}".format(
alpha.shape, s1.shape, s2.shape, exception_type, msg
alpha.shape, sn[0].shape, sn[0].shape, exception_type, msg
)
):
dgs = DynamicallyGatedSimilarities(config=default_cfg, path=["combined_similarity"])
with self.assertRaises(exception_type) as e:
dgs.forward(alpha, s1, s2)
dgs.forward(*sn, alpha=alpha)
self.assertTrue(msg in str(e.exception), msg=e.exception)


Expand Down
49 changes: 49 additions & 0 deletions tests/models/embedding_generator/test__pose_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,55 @@ def test_linear_PBEG_out_shape(self, device: Device):
self.assertEqual(res.device.type, device.type)
self.assertEqual(res.shape, out_shape)

def test_linear_PBEG_flattened(self):
batch_size = 7
params = {
"hidden_layers": [],
"joint_shape": (21, 2),
"nof_kernels": 10,
"embedding_size": 5,
"bbox_format": "xyxy",
}
cfg = fill_in_defaults({"pose_embedding_generator": params, "batch_size": batch_size})
m = LinearPBEG(config=cfg, path=["pose_embedding_generator"])
kp = torch.rand((batch_size, *params["joint_shape"])).reshape((batch_size, -1))
bbox = torch.rand((batch_size, 4)).reshape((batch_size, -1))
res = m.forward(torch.hstack([kp, bbox]).reshape((batch_size, -1)))
self.assertEqual(res.shape, (batch_size, 5))

def test_linear_PBEG_raises(self):
batch_size = 7
params = {
"hidden_layers": [],
"joint_shape": (21, 2),
"nof_kernels": 10,
"embedding_size": 5,
"bbox_format": "xyxy",
}
cfg = fill_in_defaults({"pose_embedding_generator": params, "batch_size": batch_size})
m = LinearPBEG(config=cfg, path=["pose_embedding_generator"])
with self.assertRaises(ValueError) as e:
m.forward()
self.assertTrue("Data should contain key points and bounding boxes, but has length" in str(e.exception))

def test_KPCPBEG_raises(self):
batch_size = 7
params = {
"hidden_layers": [],
"joint_shape": (21, 2),
"nof_kernels": 10,
"embedding_size": 4,
"bbox_format": "xyxy",
}
cfg = fill_in_defaults({"pose_embedding_generator": params, "batch_size": batch_size})
m = KeyPointConvolutionPBEG(config=cfg, path=["pose_embedding_generator"])
kp = torch.rand((batch_size, *params["joint_shape"]))
bbox = tv_tensors.BoundingBoxes(torch.rand((batch_size, 4)), format="XYWH", canvas_size=(100, 100))

with self.assertRaises(ValueError) as e:
m.forward(kp, bbox, "dummy")
self.assertTrue("Data should contain key points and bounding boxes, but has length" in str(e.exception))

@test_multiple_devices
def test_KPCPBEG_out_shape(self, device: Device):
for batch_size, params, out_shape in [
Expand Down
Loading

0 comments on commit dd8ce33

Please sign in to comment.