Skip to content

Commit

Permalink
Fix typing errors, fix running from in-cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Feb 23, 2024
1 parent b6e2eba commit f9945cb
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 28 deletions.
4 changes: 3 additions & 1 deletion robusta_krr/core/integrations/kubernetes/config_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from __future__ import annotations

from typing import Optional

from kubernetes.client import configuration
from kubernetes.config import kube_config

Expand All @@ -25,7 +27,7 @@ def _set_config(self, client_configuration: Configuration):
class Configuration(configuration.Configuration):
def __init__(
self,
proxy: str | None = None,
proxy: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
Expand Down
2 changes: 1 addition & 1 deletion robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def get_metrics_service(

async def get_history_range(
self, history_duration: datetime.timedelta
) -> tuple[datetime.datetime, datetime.datetime] | None:
) -> Optional[tuple[datetime.datetime, datetime.datetime]]:
return await self.loader.get_history_range(history_duration)

async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_cluster_names(self) -> Optional[List[str]]:
logger.error("Labels api not present on prometheus client")
return []

async def get_history_range(self, history_duration: timedelta) -> tuple[datetime, datetime] | None:
async def get_history_range(self, history_duration: timedelta) -> tuple[datetime, datetime]:
"""
Get the history range from Prometheus, based on container_memory_working_set_bytes.
Returns:
Expand All @@ -149,12 +149,16 @@ async def get_history_range(self, history_duration: timedelta) -> tuple[datetime
end=now,
step=timedelta(hours=1),
)
values = result[0]["values"]
try:
if isinstance(result, dict) and "result" in result:
result = result["result"]

values = result[0]["values"]
start, end = values[0][0], values[-1][0]
return datetime.fromtimestamp(start), datetime.fromtimestamp(end)
except (KeyError, IndexError):
return None
except (KeyError, IndexError) as e:
logger.debug(f"Returned from get_history_range: {result}")
raise ValueError("Error while getting history range") from e

async def gather_data(
self,
Expand Down
2 changes: 1 addition & 1 deletion robusta_krr/core/models/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Result(pd.BaseModel):
resources: list[str] = ["cpu", "memory"]
description: Optional[str] = None
strategy: StrategyData
metadata: list[Any] = pd.Field(default_factory=list)
errors: list[dict[str, Any]] = pd.Field(default_factory=list)

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
Expand Down
59 changes: 38 additions & 21 deletions robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self) -> None:
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = settings.create_strategy()

self.metadata: list = []
self.errors: list[dict] = []

# This executor will be running calculations for recommendations
self._executor = ThreadPoolExecutor(settings.max_workers)
Expand Down Expand Up @@ -67,7 +67,7 @@ def _greet(self) -> None:
print("")

def _process_result(self, result: Result) -> None:
result.metadata = self.metadata
result.errors = self.errors

Formatter = settings.Formatter
formatted = result.format(Formatter)
Expand Down Expand Up @@ -144,13 +144,12 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunR
object.pods = await self._k8s_loader.load_pods(object)

# NOTE: Kubernetes API returned pods, but Prometheus did not
# This might happen with fast executing jobs
if object.pods != []:
object.add_warning("NoPrometheusPods")
logger.warning(
f"Was not able to load any pods for {object} from Prometheus.\n\t"
"This could mean that Prometheus is missing some required metrics.\n\t"
"Loaded pods from Kubernetes API instead.\n\t"
"See more info at https://github.com/robusta-dev/krr#requirements "
f"Was not able to load any pods for {object} from Prometheus. "
"Loaded pods from Kubernetes API instead."
)

metrics = await prometheus_loader.gather_data(
Expand All @@ -168,27 +167,42 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunR
logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)")
return self._format_result(result)

async def _check_data_availability(self, cluster: str) -> None:
async def _check_data_availability(self, cluster: Optional[str]) -> None:
prometheus_loader = self._get_prometheus_loader(cluster)
if prometheus_loader is None:
return

history_range = await prometheus_loader.get_history_range(self._strategy.settings.history_timedelta)
logger.debug(f"History range for {cluster}: {history_range}")
enough_data = history_range is not None and self._strategy.settings.history_range_enough(history_range)

if enough_data:
try:
history_range = await prometheus_loader.get_history_range(self._strategy.settings.history_timedelta)
except ValueError:
logger.exception(f"Was not able to get history range for cluster {cluster}")
self.errors.append(
{
"name": "HistoryRangeError",
}
)
return

if history_range is None:
logger.warning(f"Was not able to load history available for cluster {cluster}.")
else:
logger.warning(f"Not enough history available for cluster {cluster}.")
logger.debug(f"History range for {cluster}: {history_range}")
enough_data = self._strategy.settings.history_range_enough(history_range)

logger.warning(
"If the cluster is freshly installed, it might take some time for the enough data to be available."
)
self.metadata.append("NotEnoughHistoryAvailable")
if not enough_data:
logger.error(f"Not enough history available for cluster {cluster}.")
try_after = history_range[0] + self._strategy.settings.history_timedelta

logger.error(
"If the cluster is freshly installed, it might take some time for the enough data to be available."
)
logger.error(
f"Enough data is estimated to be available after {try_after}, "
"but will try to calculate recommendations anyway."
)
self.errors.append(
{
"name": "NotEnoughHistoryAvailable",
"retry_after": try_after,
}
)

async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> ResourceScan:
recommendation = await self._calculate_object_recommendations(k8s_object)
Expand Down Expand Up @@ -217,7 +231,10 @@ async def _collect_result(self) -> Result:

logger.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}')

await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters])
if clusters is None:
await self._check_data_availability(None)
else:
await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters])

with ProgressBar(title="Calculating Recommendation") as self.__progressbar:
scans_tasks = [
Expand Down

0 comments on commit f9945cb

Please sign in to comment.