Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove experimental worker logs setting #16024

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions docs/3.0/deploy/infrastructure-concepts/workers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,8 @@ If a worker misses three heartbeats, it is considered offline. By default, a wor
after it stopped sending heartbeats, but you can configure the threshold with the `PREFECT_WORKER_HEARTBEAT_SECONDS` setting.

### Worker logs
<Warning>This feature is experimental as of Prefect version 3.1.1</Warning>
<span class="badge cloud"></span> Workers send logs to the Prefect Cloud API if you're connected to Prefect Cloud.

<span class="badge cloud"></span> Workers can now send logs directly to Prefect Cloud.

To enable this feature, run the command:
```bash
prefect config set PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED=true
```
See our [profile settings](/3.0/manage/settings-and-profiles/) documentation for more information about changing settings.

Once enabled:
- All worker logs are automatically sent to the Prefect Cloud API
- Logs are accessible through both the Prefect Cloud UI and API
- Each flow run will include a link to its associated worker's logs
Expand All @@ -126,8 +117,6 @@ Once enabled:
- Installed Prefect integrations (e.g., `prefect-aws`, `prefect-gcp`)
- Live worker logs (if worker logging is enabled)

<Warning>Worker logging is an experimental feature as of Prefect version 3.1.1.</Warning> If your worker is using an earlier version of Prefect or you have not opted in to the experiment, this page will not show worker logs.

Access a worker's details by clicking on the worker's name in the Work Pool list.

### Start a worker
Expand Down
12 changes: 0 additions & 12 deletions docs/3.0/develop/settings-ref.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -455,18 +455,6 @@ If `True`, warn on usage of experimental features.
**Supported environment variables**:
`PREFECT_EXPERIMENTS_WARN`, `PREFECT_EXPERIMENTAL_WARN`

### `worker_logging_to_api_enabled`
Enables the logging of worker logs to Prefect Cloud.

**Type**: `boolean`

**Default**: `False`

**TOML dotted key path**: `experiments.worker_logging_to_api_enabled`

**Supported environment variables**:
`PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED`

### `telemetry_enabled`
Enables sending telemetry to Prefect Cloud.

Expand Down
9 changes: 0 additions & 9 deletions schemas/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,6 @@
"title": "Warn",
"type": "boolean"
},
"worker_logging_to_api_enabled": {
"default": false,
"description": "Enables the logging of worker logs to Prefect Cloud.",
"supported_environment_variables": [
"PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED"
],
"title": "Worker Logging To Api Enabled",
"type": "boolean"
},
"telemetry_enabled": {
"default": false,
"description": "Enables sending telemetry to Prefect Cloud.",
Expand Down
9 changes: 5 additions & 4 deletions src/prefect/logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
PREFECT_LOGGING_TO_API_BATCH_SIZE,
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE,
PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW,
get_current_settings,
)


Expand Down Expand Up @@ -241,10 +240,12 @@ def _get_payload_size(self, log: Dict[str, Any]) -> int:

class WorkerAPILogHandler(APILogHandler):
def emit(self, record: logging.LogRecord):
if get_current_settings().experiments.worker_logging_to_api_enabled:
super().emit(record)
else:
# Open-source API servers do not currently support worker logs, and
# worker logs only have an associated worker ID when connected to Cloud,
# so we won't send worker logs to the API unless they have a worker ID.
if not getattr(record, "worker_id", None):
return
super().emit(record)

def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
"""
Expand Down
5 changes: 0 additions & 5 deletions src/prefect/settings/models/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ class ExperimentsSettings(PrefectBaseSettings):
),
)

worker_logging_to_api_enabled: bool = Field(
default=False,
description="Enables the logging of worker logs to Prefect Cloud.",
)

telemetry_enabled: bool = Field(
default=False,
description="Enables sending telemetry to Prefect Cloud.",
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/utilities/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import socket
import urllib.parse
from string import Formatter
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union
from typing import TYPE_CHECKING, Any, Literal, Optional, Union
from urllib.parse import urlparse
from uuid import UUID

Expand Down Expand Up @@ -135,7 +135,7 @@ def url_for(
obj_id: Optional[Union[str, UUID]] = None,
url_type: URLType = "ui",
default_base_url: Optional[str] = None,
**additional_format_kwargs: Optional[Dict[str, Any]],
**additional_format_kwargs: Any,
) -> Optional[str]:
"""
Returns the URL for a Prefect object.
Expand Down
11 changes: 4 additions & 7 deletions src/prefect/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,14 +821,14 @@ async def sync_with_backend(self):
self._logger = get_worker_logger(self)

self._logger.debug(
f"Worker synchronized with the Prefect API server. Remote ID: {self.backend_id}"
"Worker synchronized with the Prefect API server. "
+ (f"Remote ID: {self.backend_id}" if self.backend_id else "")
)

def _should_get_worker_id(self):
"""Determines if the worker should request an ID from the API server."""
return (
get_current_settings().experiments.worker_logging_to_api_enabled
and self._client
self._client
and self._client.server_type == ServerType.CLOUD
and self.backend_id is None
)
Expand Down Expand Up @@ -886,10 +886,7 @@ async def _submit_scheduled_flow_runs(
run_logger.info(
f"Worker '{self.name}' submitting flow run '{flow_run.id}'"
)
if (
get_current_settings().experiments.worker_logging_to_api_enabled
and self.backend_id
):
if self.backend_id:
try:
worker_url = url_for(
"worker",
Expand Down
91 changes: 61 additions & 30 deletions tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from prefect.server.schemas.actions import LogCreate
from prefect.settings import (
PREFECT_API_KEY,
PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED,
PREFECT_LOGGING_COLORS,
PREFECT_LOGGING_LEVEL,
PREFECT_LOGGING_MARKUP,
Expand Down Expand Up @@ -637,23 +636,37 @@ def test_handler_knows_how_large_logs_are(self):
assert handler._get_payload_size(dict_log) == log_size


WORKER_ID = uuid.uuid4()


class TestWorkerLogging:
class WorkerTestImpl(BaseWorker):
type: str = "logging_test"
class CloudWorkerTestImpl(BaseWorker):
type: str = "cloud_logging_test"
job_configuration: Type[BaseJobConfiguration] = BaseJobConfiguration

async def _send_worker_heartbeat(self, *_, **__):
return "test_backend_id"
"""
Workers only return an ID here if they're connected to Cloud,
so this simulates the worker being connected to Cloud.
"""
return WORKER_ID

async def run(self, *_, **__):
pass

@pytest.fixture
def experiment_enabled(self):
with temporary_settings(
updates={PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED: True}
):
yield
class ServerWorkerTestImpl(BaseWorker):
type: str = "server_logging_test"
job_configuration: Type[BaseJobConfiguration] = BaseJobConfiguration

async def run(self, *_, **__):
pass

async def _send_worker_heartbeat(self, *_, **__):
"""
Workers only return an ID here if they're connected to Cloud,
so this simulates the worker not being connected to Cloud.
"""
return None

@pytest.fixture
def logging_to_api_enabled(self):
Expand All @@ -672,24 +685,24 @@ def logger(self, worker_handler):
yield logger
logger.removeHandler(worker_handler)

async def test_get_worker_logger_works_with_no_backend_id(self, experiment_enabled):
async with self.WorkerTestImpl(
async def test_get_worker_logger_works_with_no_backend_id(self):
async with self.CloudWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
logger = get_worker_logger(worker)
assert logger.name == "prefect.workers.logging_test.test"
assert logger.name == "prefect.workers.cloud_logging_test.test"

async def test_get_worker_logger_works_with_backend_id(self, experiment_enabled):
async with self.WorkerTestImpl(
async def test_get_worker_logger_works_with_backend_id(self):
async with self.CloudWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
await worker.sync_with_backend()
logger = get_worker_logger(worker)
assert logger.name == "prefect.workers.logging_test.test"
assert logger.extra["worker_id"] == "test_backend_id"
assert logger.name == "prefect.workers.cloud_logging_test.test"
assert logger.extra["worker_id"] == str(WORKER_ID)

async def test_worker_emits_logs_with_worker_id(self, caplog, experiment_enabled):
async with self.WorkerTestImpl(
async def test_worker_emits_logs_with_worker_id(self, caplog):
async with self.CloudWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
await worker.sync_with_backend()
Expand All @@ -700,21 +713,39 @@ async def test_worker_emits_logs_with_worker_id(self, caplog, experiment_enabled
]

assert "testing_with_extras" in caplog.text
assert record_with_extras[0].worker_id == worker.backend_id
assert worker._logger.extra["worker_id"] == worker.backend_id
assert record_with_extras[0].worker_id == str(worker.backend_id)
assert worker._logger.extra["worker_id"] == str(worker.backend_id)

def test_worker_logger_sends_log_to_api_worker(
self, logger, mock_log_worker, experiment_enabled, logging_to_api_enabled
async def test_worker_logger_sends_log_to_api_worker_when_connected_to_cloud(
self, mock_log_worker, worker_handler, logging_to_api_enabled
):
logger.info("test-worker-log")
async with self.CloudWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
await worker.sync_with_backend()
worker._logger.debug("test-worker-log")

log_statement = [
log
for call in mock_log_worker.instance().send.call_args_list
for log in call.args
if log["name"] == worker._logger.name
and log["message"] == "test-worker-log"
]

mock_log_worker.instance().send.assert_called_once()
assert len(mock_log_worker.instance().send.call_args_list) == 1
assert len(log_statement) == 1
assert log_statement[0]["worker_id"] == str(worker.backend_id)

log_statement = mock_log_worker.instance().send.call_args.args[0]
assert log_statement["name"] == logger.name
assert log_statement["level"] == 20
assert log_statement["message"] == "test-worker-log"
async def test_worker_logger_does_not_send_logs_when_not_connected_to_cloud(
self, mock_log_worker, worker_handler, logging_to_api_enabled
):
async with self.ServerWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
assert isinstance(worker._logger, logging.Logger)
worker._logger.debug("test-worker-log")

mock_log_worker.instance().send.assert_not_called()


class TestAPILogWorker:
Expand Down
1 change: 0 additions & 1 deletion tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@
"PREFECT_EXPERIMENTAL_WARN": {"test_value": True, "legacy": True},
"PREFECT_EXPERIMENTS_TELEMETRY_ENABLED": {"test_value": False},
"PREFECT_EXPERIMENTS_WARN": {"test_value": True},
"PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED": {"test_value": False},
"PREFECT_FLOW_DEFAULT_RETRIES": {"test_value": 10, "legacy": True},
"PREFECT_FLOWS_DEFAULT_RETRIES": {"test_value": 10},
"PREFECT_FLOW_DEFAULT_RETRY_DELAY_SECONDS": {"test_value": 10, "legacy": True},
Expand Down
16 changes: 3 additions & 13 deletions tests/workers/test_base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from prefect.server.schemas.responses import DeploymentResponse
from prefect.settings import (
PREFECT_API_URL,
PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED,
PREFECT_TEST_MODE,
PREFECT_WORKER_PREFETCH_SECONDS,
get_current_settings,
Expand Down Expand Up @@ -86,14 +85,6 @@ def no_api_url():
yield


@pytest.fixture
def experimental_logging_enabled():
with temporary_settings(
updates={PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED: True}
):
yield


async def test_worker_requires_api_url_when_not_in_test_mode(no_api_url):
with pytest.raises(ValueError, match="PREFECT_API_URL"):
async with WorkerTestImpl(
Expand Down Expand Up @@ -197,7 +188,7 @@ async def test_worker_sends_heartbeat_gets_id(respx_mock):
assert worker.backend_id == test_worker_id


async def test_worker_sends_heartbeat_only_gets_id_once(experimental_logging_enabled):
async def test_worker_sends_heartbeat_only_gets_id_once():
async with WorkerTestImpl(name="test", work_pool_name="test-work-pool") as worker:
worker._client.server_type = ServerType.CLOUD
mock = AsyncMock(return_value="test")
Expand Down Expand Up @@ -1567,7 +1558,6 @@ async def test_get_flow_run_logger_with_worker_id_set(
prefect_client: PrefectClient,
worker_deployment_wq1,
work_pool,
experimental_logging_enabled,
):
flow_run = await prefect_client.create_flow_run_from_deployment(
worker_deployment_wq1.id
Expand Down Expand Up @@ -1883,7 +1873,7 @@ async def test_env_merge_logic_is_deep(

class TestBaseWorkerHeartbeat:
async def test_worker_heartbeat_sends_integrations(
self, work_pool, hosted_api_server, experimental_logging_enabled
self, work_pool, hosted_api_server
):
async with WorkerTestImpl(work_pool_name=work_pool.name) as worker:
await worker.start(run_once=True)
Expand Down Expand Up @@ -1926,7 +1916,7 @@ async def test_worker_heartbeat_sends_integrations(
assert worker._worker_metadata_sent

async def test_custom_worker_can_send_arbitrary_metadata(
self, work_pool, hosted_api_server, experimental_logging_enabled
self, work_pool, hosted_api_server
):
class CustomWorker(BaseWorker):
type: str = "test-custom-metadata"
Expand Down
Loading