Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2d shape #54

Merged
merged 16 commits into from
Mar 20, 2023
2 changes: 1 addition & 1 deletion .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:

pytest:

runs-on: ubuntu-latest
runs-on: ubuntu-20.04
if: always()
strategy:
matrix:
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ The data can be shown as an animation using the `show_model` function:
show_model(ds)
```
You can specify the blob parameters with a BlobFactory class. The DefaultBlobFactory class has some of the most common distribution functions implemented. An example would look like this:

```Python
from blobmodel import Model, DefaultBlobFactory

# use DefaultBlobFactory to define distribution functions fo random variables
bf = DefaultBlobFactory(A_dist="exp", W_dist="uniform", vx_dist="deg", vy_dist="normal")
bf = DefaultBlobFactory(A_dist="exp", wx_dist="uniform", vx_dist="deg", vy_dist="normal")

# pass on bf when creating the Model
tmp = Model(
Expand Down
1 change: 1 addition & 0 deletions blobmodel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .plotting import show_model
from .stochasticality import BlobFactory, DefaultBlobFactory
from .geometry import Geometry
from .pulse_shape import AbstractBlobShape, BlobShapeImpl
51 changes: 21 additions & 30 deletions blobmodel/blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Tuple, Union
from nptyping import NDArray
import numpy as np
from .pulse_shape import AbstractBlobShape


class Blob:
Expand All @@ -10,7 +11,7 @@ class Blob:
def __init__(
self,
blob_id: int,
blob_shape: str,
blob_shape: AbstractBlobShape,
amplitude: float,
width_prop: float,
width_perp: float,
Expand Down Expand Up @@ -71,31 +72,31 @@ def discretize_blob(
x_perp, y_perp, t, Ly, periodic_y, one_dimensional=one_dimensional
)
if np.sin(self.theta) == 0:
_x_border = Ly - self.pos_y
_adjusted_Ly = Ly
x_border = Ly - self.pos_y
adjusted_Ly = Ly
else:
_x_border = (Ly - self.pos_y) / np.sin(self.theta)
_adjusted_Ly = Ly / np.sin(self.theta)
x_border = (Ly - self.pos_y) / np.sin(self.theta)
adjusted_Ly = Ly / np.sin(self.theta)
if type(t) in [int, float]:
# t has dimensionality = 0, used for testing
_number_of_y_propagations = (
self._prop_dir_blob_position(t) + _adjusted_Ly - _x_border
) // _adjusted_Ly
number_of_y_propagations = (
self._prop_dir_blob_position(t) + adjusted_Ly - x_border
) // adjusted_Ly
else:
_number_of_y_propagations = (
self._prop_dir_blob_position(t)[0, 0] + _adjusted_Ly - _x_border
) // _adjusted_Ly
number_of_y_propagations = (
self._prop_dir_blob_position(t)[0, 0] + adjusted_Ly - x_border
) // adjusted_Ly
return (
self._single_blob(
x_perp, y_perp, t, Ly, periodic_y, _number_of_y_propagations
x_perp, y_perp, t, Ly, periodic_y, number_of_y_propagations
)
+ self._single_blob(
x_perp,
y_perp,
t,
Ly,
periodic_y,
_number_of_y_propagations,
number_of_y_propagations,
x_offset=Ly * np.sin(self.theta),
y_offset=Ly * np.cos(self.theta),
one_dimensional=one_dimensional,
Expand All @@ -106,7 +107,7 @@ def discretize_blob(
t,
Ly,
periodic_y,
_number_of_y_propagations,
number_of_y_propagations,
x_offset=-Ly * np.sin(self.theta),
y_offset=-Ly * np.cos(self.theta),
one_dimensional=one_dimensional,
Expand Down Expand Up @@ -145,17 +146,13 @@ def _single_blob(
number_of_y_propagations=number_of_y_propagations,
)
)
* self._blob_arrival(t)
)

def _drain(self, t: NDArray) -> NDArray:
if isinstance(self.t_drain, (int, float)):
return np.exp(-(t - self.t_init) / self.t_drain)
return np.exp(-(t - self.t_init) / self.t_drain[np.newaxis, :, np.newaxis])

def _blob_arrival(self, t: NDArray) -> NDArray:
return np.heaviside(t - self.t_init, 1)

def _propagation_direction_shape(
self,
x: NDArray,
Expand All @@ -172,15 +169,8 @@ def _propagation_direction_shape(
)
else:
x_diffs = x - self._prop_dir_blob_position(t)

if self.blob_shape == "gauss":
return 1 / np.sqrt(np.pi) * np.exp(-(x_diffs**2 / self.width_prop**2))
elif self.blob_shape == "exp":
return np.exp(x_diffs / self.width_prop) * np.heaviside(-1.0 * (x_diffs), 1)
else:
raise NotImplementedError(
f"{self.__class__.__name__}.blob_shape not implemented"
)
theta_x = x_diffs / self.width_prop
return self.blob_shape.get_pulse_shape_prop(theta_x, ...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... serves as a placeholder for kwargs right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep


def _perpendicular_direction_shape(
self,
Expand All @@ -197,17 +187,18 @@ def _perpendicular_direction_shape(
)
else:
y_diffs = y - self._perp_dir_blob_position()
return 1 / np.sqrt(np.pi) * np.exp(-(y_diffs**2) / self.width_perp**2)
theta_y = y_diffs / self.width_perp
return self.blob_shape.get_pulse_shape_perp(theta_y, ...)

def _prop_dir_blob_position(self, t: NDArray) -> NDArray:
return self.pos_x + (self.v_x**2 + self.v_y**2) ** 0.5 * (t - self.t_init)

def _perp_dir_blob_position(self) -> NDArray:
def _perp_dir_blob_position(self) -> float:
return self.pos_y

def _rotate(
self, origin: Tuple[float, float], x: NDArray, y: NDArray, angle: float
) -> Tuple[float, float]:
) -> Tuple[NDArray, NDArray]:
ox, oy = origin
px, py = x, y

Expand Down
54 changes: 31 additions & 23 deletions blobmodel/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .geometry import Geometry
from nptyping import NDArray
import warnings
from .pulse_shape import AbstractBlobShape, BlobShapeImpl


class Model:
Expand All @@ -21,7 +22,7 @@ def __init__(
dt: float = 0.1,
T: float = 10,
periodic_y: bool = False,
blob_shape: str = "gauss",
blob_shape: Union[AbstractBlobShape, str] = BlobShapeImpl("gauss"),
gregordecristoforo marked this conversation as resolved.
Show resolved Hide resolved
num_blobs: int = 1000,
t_drain: Union[float, NDArray] = 10,
blob_factory: BlobFactory = DefaultBlobFactory(),
Expand Down Expand Up @@ -81,7 +82,9 @@ def __init__(
T=T,
periodic_y=periodic_y,
)
self.blob_shape: str = blob_shape
self.blob_shape = (
gregordecristoforo marked this conversation as resolved.
Show resolved Hide resolved
BlobShapeImpl(blob_shape) if isinstance(blob_shape, str) else blob_shape
)
self.num_blobs: int = num_blobs
self.t_drain: Union[float, NDArray] = t_drain

Expand All @@ -98,7 +101,7 @@ def __init__(
def __str__(self) -> str:
"""string representation of Model."""
return (
f"2d Blob Model with blob shape:{self.blob_shape},"
f"2d Blob Model with"
+ f" num_blobs:{self.num_blobs} and t_drain:{self.t_drain}"
)

Expand Down Expand Up @@ -219,28 +222,33 @@ def _sum_up_blobs(

def _compute_start_stop(self, blob: Blob, speed_up: bool, error: float):
if speed_up:
_start = int(blob.t_init / self._geometry.dt)
if blob.v_x == 0:
_stop = self._geometry.t.size
else:
# ignores t_drain when calculating stop time
_stop = np.minimum(
self._geometry.t.size,
_start
+ int(
(
-np.log(error * np.sqrt(np.pi))
+ self._geometry.Lx
- blob.pos_x
)
/ (blob.v_x * self._geometry.dt)
),
start = 0
stop = self._geometry.t.size
return start, stop
start = int(
(
blob.t_init * blob.v_x
+ blob.width_prop * np.log(error * np.sqrt(np.pi))
+ blob.pos_x
)
else:
_start = 0
_stop = self._geometry.t.size

return _start, _stop
/ (self._geometry.dt * blob.v_x)
)
# ignores t_drain when calculating stop time
stop = np.minimum(
self._geometry.t.size,
start
+ int(
(
-blob.width_prop * np.log(error * np.sqrt(np.pi))
+ self._geometry.Lx
- blob.pos_x
)
/ (blob.v_x * self._geometry.dt)
),
)
return start, stop
return 0, self._geometry.t.size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is equivalent to the blob.v_x==0 case. We could reduce a level of indentation by writing:
if blob_v_x == 0 or not speed_up:
return 0, self._geometry.t_size
speed_up code
what do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, good one!


def _reset_fields(self):
self._density = np.zeros(
Expand Down
74 changes: 74 additions & 0 deletions blobmodel/pulse_shape.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from abc import ABC, abstractmethod
import numpy as np
from nptyping import NDArray


class AbstractBlobShape(ABC):
"""Abstract class containing the blob pulse shapes. Two-dimensional blob
pulse shapes are written in the form.

phi(theta_x, theta_y) = phi_x(theta_x) * phi_y(theta_y).
"""

@abstractmethod
def get_pulse_shape_prop(self, theta_prop: NDArray, kwargs):
raise NotImplementedError

@abstractmethod
def get_pulse_shape_perp(self, theta_perp: NDArray, kwargs):
raise NotImplementedError


class BlobShapeImpl(AbstractBlobShape):
"""Implementation of the AbstractPulseShape class."""

__SHAPE_NAMES__ = {"exp", "gauss", "2-exp", "lorentz", "secant"}

def __init__(self, pulse_shape_prop="gauss", pulse_shape_perp="gauss"):
self.__GENERATORS__ = {
gregordecristoforo marked this conversation as resolved.
Show resolved Hide resolved
"exp": BlobShapeImpl._get_exponential_shape,
"gauss": BlobShapeImpl._get_gaussian_shape,
"2-exp": BlobShapeImpl._get_double_exponential_shape,
"lorentz": BlobShapeImpl._get_lorentz_shape,
"secant": BlobShapeImpl._get_secant_shape,
}
self.pulse_shape_prop = pulse_shape_prop
self.pulse_shape_perp = pulse_shape_perp
if (
pulse_shape_prop not in BlobShapeImpl.__SHAPE_NAMES__
or pulse_shape_perp not in BlobShapeImpl.__SHAPE_NAMES__
):
raise NotImplementedError(
f"{self.__class__.__name__}.blob_shape not implemented"
)

def get_pulse_shape_prop(self, theta: np.ndarray, kwargs) -> np.ndarray:
return self.__GENERATORS__.get(self.pulse_shape_prop)(theta, kwargs)

def get_pulse_shape_perp(self, theta: np.ndarray, kwargs) -> np.ndarray:
return self.__GENERATORS__.get(self.pulse_shape_perp)(theta, kwargs)

@staticmethod
def _get_exponential_shape(theta: np.ndarray, kwargs) -> np.ndarray:
return np.exp(theta) * np.heaviside(-1.0 * theta, 1)

@staticmethod
def _get_lorentz_shape(theta: np.ndarray, kwargs) -> np.ndarray:
return 1 / (np.pi * (1 + theta**2))

@staticmethod
def _get_double_exponential_shape(theta: np.ndarray, kwargs) -> np.ndarray:
lam = kwargs["lam"]
gregordecristoforo marked this conversation as resolved.
Show resolved Hide resolved
assert (lam > 0.0) & (lam < 1.0)
kern = np.zeros(len(theta))
kern[theta < 0] = np.exp(theta[theta < 0] / lam)
kern[theta >= 0] = np.exp(-theta[theta >= 0] / (1 - lam))
return kern

@staticmethod
def _get_gaussian_shape(theta: np.ndarray, kwargs) -> np.ndarray:
return 1 / np.sqrt(np.pi) * np.exp(-(theta**2))

@staticmethod
def _get_secant_shape(theta: np.ndarray, kwargs) -> np.ndarray:
return 2 / np.pi / (np.exp(theta) + np.exp(-theta))
Loading