Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mecanismo de timeout y retries en los envios a IOTAs #85

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion python-lib/tc_etl_lib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad
- :param obligatorio `endpoint`: define el endpoint del context broker (ejemplo: https://`<service>`:`<port>`). Se debe especificar en el constructor del objeto de tipo cbManager, sino avisará con una excepción ValueError.
- :param opcional `timeout`: timeout definido en segundos (default: 10).
- :param opcional `post_retry_connect`: Número de reintentos a la hora de realizar un envío de datos (default: 3)
- :param opcional `post_retry_backoff_factor`: Factor que se usa, para esperar varios segundos tras enviar una ráfaga de datos. (default: 0)
- :param opcional `post_retry_backoff_factor`: Factor que se usa, para esperar varios segundos tras enviar una ráfaga de datos. (default: 20)
- :param opcional `sleep_send_batch`: Pausa en segundos, que se realiza cada vez que se envia una ráfaga de datos. (default: 0).
- :param opcional `cb_flowcontrol`: Opción del Context Broker, que permite un mejor rendimiento en caso de envío masivo de datos (batch updates). Este mecanismo, requiere arrancar el Context Broker con un flag concreto y en las peticiones de envío de datos, añadir esa opción. Referencia en [documentación de Orion](https://fiware-orion.readthedocs.io/en/master/admin/perf_tuning/index.html#updates-flow-control-mechanism) (default: False)
- :param opcional `block_size`: Cuando se realiza el envío de datos al Context Broker mediante la función de `send_batch`, se realiza envíos en tramos que no excedan el block_size indicado (default: 800000). Se permite modificar el valor de block_size, pero sin superar la limitación de 800000. En caso de indicar un valor que supere ese límite, se lanzará una excepción ValueError indicando que se ha excedido el límite del valor permitido.
Expand Down Expand Up @@ -361,6 +361,9 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad
- :param obligatorio `device_id`: El ID del device.
- :param obligatorio `api_key`: La API key correspondiente al device.
- :param obligatorio `endpoint`: La URL del servicio al que se le quiere enviar los datos.
- :param opcional `timeout`: timeout definido en segundos (default: 10).
- :param opcional `post_retry_connect`: Número de reintentos a la hora de realizar un envío de datos (default: 3)
- :param opcional `post_retry_backoff_factor`: Factor que se usa, para esperar varios segundos tras enviar una ráfaga de datos. (default: 20)
Comment on lines +364 to +366
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Veo que se ha hecho una definición de parámetro análoga a la del CB, con el mismo nombre y descripción de parámetros. ¡Bien! :)

Sin embargo, hay una diferencia. Aquí se usa un default: 20 para post_retry_backoff_factor, mientras que en el CB se usa 0 (que entiendo es no aplicar espera alguna).

Igual deberiamos dejarlo también a 0, por homogeneidad. O lo contrario... cambiar en el CB para que esté a 20 y también sea homogeneo así (es un ligero cambio en la compatibilidad hacia atrás, pero asumible).

@arcosa ¿tú que opinas? ¿Por qué cuando implementamos esto para el CB se eligió 0 como default?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hola, me parece que en el cb el post_retry_backoff_factor es por defecto 20, pero en el README.md puede que esté mal documentado.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Si, es curioso, no sabía que CB tb usaba esa variable (o similar).. mi yo del pasado escogió ese valor, pensando en que es un factor "multiplicador" que incrementa el tiempo entre reintentos. Si le pones 0.. implica que los reintentos se hacen en el momento. Si hay problemas de conectividad puntuales (porque cae un servicio, se está reiniciando, algún miniproblema de red, etc..) si lanzas 3 reintentos sin espacio entre ellos.. lo probable es que fallen los 3... con factor 20, creo que empezaba reintantdo con unos segundos y los últimos reintentos tardaba algún minuto.. si pasado unos minutos, sigue dando el problema de conectividad, entonces ya desiste. Todo esto pensando en el contexto de envío de datos en batch al Context Broker con ETLs que pueden esperar un poco, antes de que pete la ETL por no poder enviar los datos.

Estaba revisando que en la doc oficial, el backoff está deshabilitado (factor set to 0). Y si el CB tb lo usa y por defecto es cero.. se podría asumir ponerlo aquí a cero. De hecho mirando la doc oficial https://pypi.org/project/tc-etl-lib/ sale como si por defecto estuviera a 0, pero en código lo tenemos a 20.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Si en el código está a 20, entonces simplemente hay que corregirlo en la parte de docu del CB y ya estaría. @CeciliaFili puedes incluirlo en esta PR, pls?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claro!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fgalan Hecho

- :param opcional `sleep_send_batch`: Es el tiempo de espera entre cada envío de datos en segundos (default: 0).
- `send_http`: Función que envía un archivo en formato JSON al agente IoT por petición HTTP.
- :param obligatorio: `data`: Datos a enviar. La estructura debe tener pares de elementos clave-valor (diccionario).
Expand Down Expand Up @@ -505,6 +508,8 @@ TOTAL 403 221 45%

## Changelog

- Add: new optional parameters `timeout`, `post_retry_connect`, `post_retry_backoff_factor` in the iotaManager constructor used in the timeout/retry logic in send_http and send_batch_http ([#72](https://github.com/telefonicasc/etl-framework/issues/72))

- Fix: rename sensor_id to device_id in iotManager to align with the term used in IOTA library ([#77](https://github.com/telefonicasc/etl-framework/pull/77))

0.11.0 (February 2nd, 2024)
Expand Down
78 changes: 57 additions & 21 deletions python-lib/tc_etl_lib/tc_etl_lib/iota.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@
import requests
import tc_etl_lib as tc
import time
from typing import Iterable, Optional, Union
import logging
from typing import Iterable, Union
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import urllib3, urllib3.exceptions
urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger(__name__)


class SendBatchError(Exception):
"SendBatchError is a class that can handle exceptions."
Expand All @@ -38,9 +46,9 @@ def __init__(self, message, original_exception=None, index=None):
self.original_exception = original_exception
self.index = index


class iotaManager:
"""IoT Agent Manager.

endpoint: define service endpoint iota (example: https://<service>:<port>).
device_id: device ID.
api_key: API key of the corresponding device.
Expand All @@ -51,12 +59,29 @@ class iotaManager:
device_id: str
api_key: str
sleep_send_batch: float
timeout: int = 10
post_retry_connect: int = 3
post_retry_backoff_factor: int = 20
session = None

def __init__(self, endpoint: str, device_id: str, api_key: str, sleep_send_batch: float = 0):
def __init__(self, endpoint: str, device_id: str, api_key: str, sleep_send_batch: float = 0, timeout: int = 10, post_retry_connect: int = 3, post_retry_backoff_factor: int = 20, session: requests.Session = None):
self.endpoint = endpoint
self.device_id = device_id
self.api_key = api_key
self.sleep_send_batch = sleep_send_batch
self.post_retry_connect = post_retry_connect
self.post_retry_backoff_factor = post_retry_backoff_factor
self.timeout = timeout
if session is None:
self.session = requests.Session()
else:
self.session = session

def __del__(self):
try:
self.session.close()
except Exception:
logger.error(f'Error closing session with endpoint: {self.endpoint}')

def send_http(self,
data: dict) -> Union[None, bool]:
Expand All @@ -71,9 +96,21 @@ def send_http(self,
headers = {
"Content-Type": "application/json"
}
http = self.session
retry_strategy = Retry(
total=self.post_retry_connect,
read=self.post_retry_connect,
backoff_factor=self.post_retry_backoff_factor,
status_forcelist=(429, 500, 502, 503, 504),
method_whitelist=('HEAD', 'GET', 'OPTIONS', 'POST')
)

adapter = HTTPAdapter(max_retries=retry_strategy)
http.mount('http://', adapter)
http.mount('https://', adapter)

try:
resp = requests.post(url=self.endpoint, json=data, params=params, headers=headers)
resp = http.post(url=self.endpoint, json=data, params=params, headers=headers)
if resp.status_code == 200:
return True
else:
Expand All @@ -87,20 +124,19 @@ def send_http(self,
raise e

def send_batch_http(self, data: Iterable) -> Union[None, bool]:

if isinstance(data, pd.DataFrame):
# Convert each row of the DataFrame to a dictionary.
for i, row in data.iterrows():
try:
self.send_http(row.to_dict())
time.sleep(self.sleep_send_batch)
except Exception as e:
raise SendBatchError(f"send_batch_http error. Row that caused the error: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e
else:
for i, dictionary in enumerate(data):
try:
self.send_http(dictionary)
time.sleep(self.sleep_send_batch)
except Exception as e:
raise SendBatchError(f"send_batch_http error. Index where the error occurred: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e
return True
if isinstance(data, pd.DataFrame):
# Convert each row of the DataFrame to a dictionary.
for i, row in data.iterrows():
try:
self.send_http(row.to_dict())
time.sleep(self.sleep_send_batch)
except Exception as e:
raise SendBatchError(f"send_batch_http error. Row that caused the error: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e
else:
for i, dictionary in enumerate(data):
try:
self.send_http(dictionary)
time.sleep(self.sleep_send_batch)
except Exception as e:
raise SendBatchError(f"send_batch_http error. Index where the error occurred: {i}\nError detail: {str(e)}", original_exception=e, index=i) from e
return True
35 changes: 21 additions & 14 deletions python-lib/tc_etl_lib/tc_etl_lib/test_iota.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class TestIotaManager(unittest.TestCase):
def test_send_http_success(self):
"""A success message should be displayed when
HTTP request is executed successfully."""
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key')
with patch('requests.post') as mock_post:
session = requests.Session()
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', session=session)
with patch.object(session, 'post') as mock_post:
fake_response = Mock()
# Simulates a successful code status.
fake_response.status_code = 200
Expand All @@ -45,8 +46,9 @@ def test_send_http_success(self):

def test_send_http_connection_error(self):
"""Should raise an exception when there is a server connection error."""
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key')
with patch('requests.post') as mock_post:
session = requests.Session()
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', session=session)
with patch.object(session, 'post') as mock_post:
mock_post.side_effect = requests.exceptions.ConnectionError()
with pytest.raises(requests.exceptions.ConnectionError):
iot.send_http(data={"key": "value"})
Expand All @@ -73,8 +75,9 @@ def test_send_http_set_not_unique(self):

def test_send_http_unauthorized(self):
"""Should raise an exception when the request is unauthorized."""
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key')
with patch('requests.post') as mock_post:
session = requests.Session()
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', session=session)
with patch.object(session, 'post') as mock_post:
mock_post.return_value.status_code = 401
with pytest.raises(exceptions.FetchError) as exc_info:
iot.send_http(data={"key": "value"})
Expand All @@ -84,8 +87,9 @@ def test_send_http_unauthorized(self):

def test_send_http_not_found(self):
"""Should raise an exception when the request is not found."""
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key')
with patch('requests.post') as mock_post:
session = requests.Session()
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', session=session)
with patch.object(session, 'post') as mock_post:
mock_post.return_value.status_code = 404
with pytest.raises(exceptions.FetchError) as exc_info:
iot.send_http(data={"key": "value"})
Expand All @@ -96,8 +100,9 @@ def test_send_http_not_found(self):

def test_send_http_server_error(self):
"""Should raise an exception if there is a server error."""
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key')
with patch('requests.post') as mock_post:
session = requests.Session()
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', session=session)
with patch.object(session, 'post') as mock_post:
mock_post.return_value.status_code = 500
with pytest.raises(exceptions.FetchError) as exc_info:
iot.send_http(data={"key": "value"})
Expand All @@ -115,8 +120,9 @@ def test_send_batch_http_dict_success(self):

def test_send_batch_http_dict_value_error(self):
"""Should raise TypeError and then raise SendBatchError with the index that failed."""
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', sleep_send_batch=0.25)
with patch('requests.post') as mock_post:
session = requests.Session()
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', sleep_send_batch=0.25, session=session)
with patch.object(session, 'post') as mock_post:
mock_post.return_value.status_code = 200
with self.assertRaises(SendBatchError) as context:
iot.send_batch_http(data=[{"key_1": "value_1"}, 2])
Expand All @@ -128,8 +134,9 @@ def test_send_batch_http_dict_value_error(self):

def test_send_batch_http_connection_error(self):
"""Should raise a ConnectionError exception and then raise SedBatchError with the index that failed."""
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', sleep_send_batch=0.25)
with patch('requests.post') as mock_post:
session = requests.Session()
iot = iotaManager(endpoint='http://fakeurl.com', device_id='fake_device_id', api_key='fake_api_key', sleep_send_batch=0.25, session=session)
with patch.object(session, 'post') as mock_post:
mock_success = MagicMock()
mock_success.status_code = 200

Expand Down
Loading