Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix infinite loop while fetching artifact store in logs storage class #3061

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/zenml/client_lazy_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ def evaluate_all_lazy_load_args_in_client_methods(
"""
import inspect

def _evaluate_args(func: Callable[..., Any]) -> Any:
def _evaluate_args(
func: Callable[..., Any], is_instance_method: bool
) -> Any:
def _inner(*args: Any, **kwargs: Any) -> Any:
is_instance_method = "self" in inspect.getfullargspec(func).args

args_ = list(args)
if not is_instance_method:
from zenml.client import Client
Expand All @@ -201,7 +201,11 @@ def _inner(*args: Any, **kwargs: Any) -> Any:

def _decorate() -> Type["Client"]:
for name, fn in inspect.getmembers(cls, inspect.isfunction):
setattr(cls, name, _evaluate_args(fn))
setattr(
cls,
name,
_evaluate_args(fn, "self" in inspect.getfullargspec(fn).args),
)
return cls

return _decorate()
25 changes: 10 additions & 15 deletions src/zenml/logging/step_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
_load_artifact_store,
_load_file_from_artifact_store,
)
from zenml.client import Client
from zenml.exceptions import DoesNotExistException
from zenml.logger import get_logger
from zenml.logging import (
Expand Down Expand Up @@ -212,6 +211,7 @@ class StepLogsStorage:
def __init__(
self,
logs_uri: str,
artifact_store: "BaseArtifactStore",
max_messages: int = STEP_LOGS_STORAGE_MAX_MESSAGES,
time_interval: int = STEP_LOGS_STORAGE_INTERVAL_SECONDS,
merge_files_interval: int = STEP_LOGS_STORAGE_MERGE_INTERVAL_SECONDS,
Expand All @@ -220,6 +220,7 @@ def __init__(

Args:
logs_uri: the URI of the log file or folder.
artifact_store: Artifact Store from the current step context
max_messages: the maximum number of messages to save in the buffer.
time_interval: the amount of seconds before the buffer gets saved
automatically.
Expand All @@ -237,22 +238,11 @@ def __init__(
self.disabled_buffer: List[str] = []
self.last_save_time = time.time()
self.disabled = False
self._artifact_store: Optional["BaseArtifactStore"] = None
self.artifact_store = artifact_store

# Immutable filesystems state
self.last_merge_time = time.time()

@property
def artifact_store(self) -> "BaseArtifactStore":
"""Returns the active artifact store.

Returns:
The active artifact store.
"""
if self._artifact_store is None:
self._artifact_store = Client().active_stack.artifact_store
return self._artifact_store

def write(self, text: str) -> None:
"""Main write method.

Expand Down Expand Up @@ -406,13 +396,18 @@ def merge_log_files(self, merge_all_files: bool = False) -> None:
class StepLogsStorageContext:
"""Context manager which patches stdout and stderr during step execution."""

def __init__(self, logs_uri: str) -> None:
def __init__(
self, logs_uri: str, artifact_store: "BaseArtifactStore"
) -> None:
"""Initializes and prepares a storage object.

Args:
logs_uri: the URI of the logs file.
artifact_store: Artifact Store from the current step context.
"""
self.storage = StepLogsStorage(logs_uri=logs_uri)
self.storage = StepLogsStorage(
logs_uri=logs_uri, artifact_store=artifact_store
)

def __enter__(self) -> "StepLogsStorageContext":
"""Enter condition of the context manager.
Expand Down
2 changes: 1 addition & 1 deletion src/zenml/orchestrators/step_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def launch(self) -> None:
)

logs_context = step_logging.StepLogsStorageContext(
logs_uri=logs_uri
logs_uri=logs_uri, artifact_store=self._stack.artifact_store
) # type: ignore[assignment]

logs_model = LogsRequest(
Expand Down
3 changes: 2 additions & 1 deletion src/zenml/orchestrators/step_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ def run(
if step_logging_enabled and not redirected.get():
if step_run.logs:
logs_context = StepLogsStorageContext( # type: ignore[assignment]
logs_uri=step_run.logs.uri
logs_uri=step_run.logs.uri,
artifact_store=self._stack.artifact_store,
)
else:
logger.debug(
Expand Down
Loading