From 1c36731d35259fe0eb1e4fbee86e0d54c7df14a9 Mon Sep 17 00:00:00 2001 From: Rachel Daniel Date: Fri, 18 Nov 2022 10:54:19 -0600 Subject: [PATCH 1/5] Fixes legacy logger --- dbt_server/logging.py | 3 +++ dbt_server/services/task_service.py | 2 ++ dbt_server/views.py | 9 +++------ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dbt_server/logging.py b/dbt_server/logging.py index 62e79b6..73fe5ec 100644 --- a/dbt_server/logging.py +++ b/dbt_server/logging.py @@ -111,6 +111,9 @@ def __init__(self, log_path): self.log_path = log_path filesystem_service.ensure_dir_exists(self.log_path) + file_logger = logging.FileHandler(self.log_path) + logger.addHandler(file_logger) + logs_redirect_handler = logbook.FileHandler( filename=self.log_path, diff --git a/dbt_server/services/task_service.py b/dbt_server/services/task_service.py index 792b8f2..fc20c9a 100644 --- a/dbt_server/services/task_service.py +++ b/dbt_server/services/task_service.py @@ -10,6 +10,8 @@ import asyncio import io +from dbt_server.services.logs_service import capture_logs + def run_task(task_name, task_id, args, db): db_task = crud.get_task(db, task_id) diff --git a/dbt_server/views.py b/dbt_server/views.py index 94afe16..1c32d0c 100644 --- a/dbt_server/views.py +++ b/dbt_server/views.py @@ -157,10 +157,10 @@ class ListArgs(BaseModel): exclude: Union[None, str, List[str]] = None select: Union[None, str, List[str]] = None selector_name: Optional[str] = None - output: Optional[str] = "" + output: Optional[str] = "name" output_keys: Union[None, str, List[str]] = None state: Optional[str] = None - indirect_selection: str = "" + indirect_selection: str = "eager" class SnapshotArgs(BaseModel): @@ -312,9 +312,7 @@ async def run_models(args: RunArgs): manifest = dbt_service.deserialize_manifest(serialize_path) results = dbt_service.dbt_run(path, args, manifest) - - encoded_results = jsonable_encoder(results) - + encoded_results = jsonable_encoder(results.to_dict()) return JSONResponse( status_code=200, content={ @@ -413,7 +411,6 @@ async def preview_sql(sql: SQLConfig): compiled_code = helpers.extract_compiled_code_from_node(result) tag_request_span(state) - return JSONResponse( status_code=200, content={ From 652eda79f6702e68a6e08b93db4d6248d46726aa Mon Sep 17 00:00:00 2001 From: Rachel Daniel Date: Tue, 6 Dec 2022 14:18:42 -0600 Subject: [PATCH 2/5] Initial pass at removing legacy logging --- dbt_server/logging.py | 116 ++++++++-------------------- dbt_server/services/logs_service.py | 18 ----- dbt_server/services/task_service.py | 6 +- 3 files changed, 35 insertions(+), 105 deletions(-) delete mode 100644 dbt_server/services/logs_service.py diff --git a/dbt_server/logging.py b/dbt_server/logging.py index 73fe5ec..0f4330d 100644 --- a/dbt_server/logging.py +++ b/dbt_server/logging.py @@ -1,17 +1,16 @@ -import io import json import logging +import uuid import os from dataclasses import dataclass from datetime import datetime from typing import Optional -import logbook -import logbook.queues - -from dbt.events.functions import STDOUT_LOG, FILE_LOG -import dbt.logger as dbt_logger +from dbt.events.functions import EVENT_MANAGER +from dbt.events.eventmgr import LoggerConfig, LineFormat, EventLevel +from dbt.events.base_types import BaseEvent from pythonjsonlogger import jsonlogger +from dbt_server.services import filesystem_service from dbt_server.models import TaskState @@ -21,6 +20,13 @@ ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID") WORKSPACE_ID = os.environ.get("WORKSPACE_ID") +dbt_event_to_python_root_log = { + EventLevel.DEBUG: logging.root.debug, + EventLevel.TEST: logging.root.debug, + EventLevel.INFO: logging.root.info, + EventLevel.WARN: logging.root.warn, + EventLevel.ERROR: logging.root.error, +} class CustomJsonFormatter(jsonlogger.JsonFormatter): def add_fields(self, log_record, record, message_dict): @@ -61,11 +67,6 @@ def add_fields(self, log_record, record, message_dict): dbt_server_logger.setLevel(logging.DEBUG) GLOBAL_LOGGER = dbt_server_logger -# remove handlers from these loggers, so -# that they propagate up to the root logger -# for json formatting -STDOUT_LOG.handlers = [] -FILE_LOG.handlers = [] # make sure uvicorn is deferring to the root # logger to format logs @@ -78,6 +79,14 @@ def add_fields(self, log_record, record, message_dict): logger.propagate = True logger_instance.handlers = [] +# Push event messages to stdout for datadog +def log_event_to_console(event: BaseEvent): + logging_method = dbt_event_to_python_root_log[event.log_level()] + # If we want to pass more information along than this (like lineno from core), + # we would need to json format this separately + logging_method(event.info.msg) + +EVENT_MANAGER.callbacks.append(log_event_to_console) def configure_uvicorn_access_log(): """Configure uvicorn access log. @@ -91,10 +100,6 @@ def configure_uvicorn_access_log(): ual.propagate = True ual.handlers = [] - -json_formatter = dbt_logger.JsonFormatter(format_string=dbt_logger.STDOUT_LOG_FORMAT) - - @dataclass class ServerLog: state: TaskState @@ -104,80 +109,23 @@ def to_json(self): return json.dumps(self.__dict__) +# TODO: Make this a contextmanager class LogManager(object): def __init__(self, log_path): - from dbt_server.services import filesystem_service - + self.name = str(uuid.uuid4()) self.log_path = log_path - filesystem_service.ensure_dir_exists(self.log_path) - file_logger = logging.FileHandler(self.log_path) - logger.addHandler(file_logger) - - - logs_redirect_handler = logbook.FileHandler( - filename=self.log_path, - level=logbook.DEBUG, - bubble=True, - # TODO : Do we want to filter these? - filter=self._dbt_logs_only_filter, + logger_config = LoggerConfig( + name=self.name, + line_format=LineFormat.DebugText, + level=EventLevel.INFO, + use_colors=True, + output_file_name=log_path, + # TODO: Add scrubber for secrets ) - - # Big hack? - logs_redirect_handler.formatter = json_formatter - - self.handlers = [ - logs_redirect_handler, - ] - - dbt_logger.log_manager.set_path(None) - - def _dbt_logs_only_filter(self, record, handler): - """ - DUPLICATE OF LogbookStepLogsStreamWriter._dbt_logs_only_filter - """ - return record.channel.split(".")[0] == "dbt" - - def setup_handlers(self): - logger.info("Setting up log handlers...") - - dbt_logger.log_manager.objects = [ - handler - for handler in dbt_logger.log_manager.objects - if type(handler) is not logbook.NullHandler - ] - - handlers = [logbook.NullHandler()] + self.handlers - - self.log_context = logbook.NestedSetup(handlers) - self.log_context.push_application() - - logger.info("Done setting up log handlers.") + EVENT_MANAGER.add_logger(logger_config) def cleanup(self): - self.log_context.pop_application() - - -class CapturingLogManager(LogManager): - def __init__(self, log_path): - super().__init__(log_path) - - self._stream = io.StringIO() - capture_handler = logbook.StreamHandler( - stream=self._stream, - level=logbook.DEBUG, - bubble=True, - filter=self._dbt_logs_only_filter, - ) - - capture_handler.formatter = json_formatter - - self.handlers += [capture_handler] + # TODO: verify that threading doesn't result in wonky list + EVENT_MANAGER.loggers = [log for log in EVENT_MANAGER.loggers if log.name != self.name] - def getLogs(self): - # Be a good citizen with the seek pos - pos = self._stream.tell() - self._stream.seek(0) - res = self._stream.read().split("\n") - self._stream.seek(pos) - return res diff --git a/dbt_server/services/logs_service.py b/dbt_server/services/logs_service.py deleted file mode 100644 index 5c856c1..0000000 --- a/dbt_server/services/logs_service.py +++ /dev/null @@ -1,18 +0,0 @@ -from dbt_server.logging import CapturingLogManager -from contextlib import contextmanager - - -@contextmanager -def capture_logs(log_path, logs): - """ - This captures logs from the yielded (presumably dbt) - invocation and appends them to the `logs` arg - """ - log_manager = CapturingLogManager(log_path) - log_manager.setup_handlers() - try: - yield - - finally: - logs += log_manager.getLogs() - log_manager.cleanup() diff --git a/dbt_server/services/task_service.py b/dbt_server/services/task_service.py index fc20c9a..1326d99 100644 --- a/dbt_server/services/task_service.py +++ b/dbt_server/services/task_service.py @@ -10,8 +10,6 @@ import asyncio import io -from dbt_server.services.logs_service import capture_logs - def run_task(task_name, task_id, args, db): db_task = crud.get_task(db, task_id) @@ -21,8 +19,9 @@ def run_task(task_name, task_id, args, db): log_path = filesystem_service.get_path(args.state_id, task_id, "logs.stdout") log_manager = LogManager(log_path) - log_manager.setup_handlers() + # TODO: Structured logging doesn't have the concept of custom log lines like this, + # need to follow up with core about a way to do this logger.info(f"Running dbt ({task_id}) - deserializing manifest {serialize_path}") manifest = dbt_service.deserialize_manifest(serialize_path) @@ -48,6 +47,7 @@ def run_task(task_name, task_id, args, db): raise RuntimeException("Not an actual task") except RuntimeException as e: crud.set_task_errored(db, db_task, str(e)) + log_manager.cleanup() raise e logger.info(f"Running dbt ({task_id}) - done") From 666e781dc89a95f03528fe4d0827c4b309e3e1db Mon Sep 17 00:00:00 2001 From: Rachel Daniel Date: Wed, 7 Dec 2022 09:12:07 -0600 Subject: [PATCH 3/5] Update log file output to use json format --- dbt_server/logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt_server/logging.py b/dbt_server/logging.py index 0f4330d..13b3bc1 100644 --- a/dbt_server/logging.py +++ b/dbt_server/logging.py @@ -117,7 +117,7 @@ def __init__(self, log_path): filesystem_service.ensure_dir_exists(self.log_path) logger_config = LoggerConfig( name=self.name, - line_format=LineFormat.DebugText, + line_format=LineFormat.Json, level=EventLevel.INFO, use_colors=True, output_file_name=log_path, From f9d0c9b981241fed733e3dcf4e2233ac36507f35 Mon Sep 17 00:00:00 2001 From: Rachel Daniel Date: Wed, 7 Dec 2022 10:53:50 -0600 Subject: [PATCH 4/5] Updates server logging to send events --- dbt_server/logging.py | 43 ++++++++++++++++------- dbt_server/server.py | 2 +- dbt_server/services/dbt_service.py | 2 +- dbt_server/services/filesystem_service.py | 2 +- dbt_server/services/task_service.py | 2 +- dbt_server/state.py | 2 +- dbt_server/views.py | 3 +- 7 files changed, 37 insertions(+), 19 deletions(-) diff --git a/dbt_server/logging.py b/dbt_server/logging.py index 13b3bc1..7822c80 100644 --- a/dbt_server/logging.py +++ b/dbt_server/logging.py @@ -12,9 +12,23 @@ from pythonjsonlogger import jsonlogger from dbt_server.services import filesystem_service +from dbt.events import AdapterLogger +from dbt.events.types import ( + AdapterEventDebug, + AdapterEventInfo, + AdapterEventWarning, + AdapterEventError, +) from dbt_server.models import TaskState +DBT_SERVER_EVENT_LOGGER = AdapterLogger("Server") +DBT_SERVER_EVENT_TYPES = [ + AdapterEventDebug, + AdapterEventInfo, + AdapterEventWarning, + AdapterEventError +] ACCOUNT_ID = os.environ.get("ACCOUNT_ID") ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID") @@ -45,9 +59,9 @@ def add_fields(self, log_record, record, message_dict): log_record["workspaceID"] = WORKSPACE_ID -# setup json logging +# setup json logging for stdout and datadog logger = logging.getLogger() -logger.setLevel(logging.INFO) +logger.setLevel(logging.DEBUG) stdout = logging.StreamHandler() if os.environ.get("APPLICATION_ENVIRONMENT") in ("dev", None): formatter = logging.Formatter( @@ -63,9 +77,6 @@ def add_fields(self, log_record, record, message_dict): ) stdout.setFormatter(formatter) logger.addHandler(stdout) -dbt_server_logger = logging.getLogger("dbt-server") -dbt_server_logger.setLevel(logging.DEBUG) -GLOBAL_LOGGER = dbt_server_logger # make sure uvicorn is deferring to the root @@ -79,14 +90,6 @@ def add_fields(self, log_record, record, message_dict): logger.propagate = True logger_instance.handlers = [] -# Push event messages to stdout for datadog -def log_event_to_console(event: BaseEvent): - logging_method = dbt_event_to_python_root_log[event.log_level()] - # If we want to pass more information along than this (like lineno from core), - # we would need to json format this separately - logging_method(event.info.msg) - -EVENT_MANAGER.callbacks.append(log_event_to_console) def configure_uvicorn_access_log(): """Configure uvicorn access log. @@ -100,6 +103,20 @@ def configure_uvicorn_access_log(): ual.propagate = True ual.handlers = [] + +# Push event messages to root python logger for formatting +def log_event_to_console(event: BaseEvent): + logging_method = dbt_event_to_python_root_log[event.log_level()] + if type(event) not in DBT_SERVER_EVENT_TYPES and logging_method == logging.root.debug: + # Only log debug level for dbt-server logs + return + logging_method(event.info.msg) + + +EVENT_MANAGER.callbacks.append(log_event_to_console) + + +# TODO: This should be some type of event. We may also choose to send events for all task state updates. @dataclass class ServerLog: state: TaskState diff --git a/dbt_server/server.py b/dbt_server/server.py index 7c78ad9..fcc7bd7 100644 --- a/dbt_server/server.py +++ b/dbt_server/server.py @@ -8,7 +8,7 @@ from dbt_server.database import engine from dbt_server.services import dbt_service, filesystem_service from dbt_server.views import app -from dbt_server.logging import GLOBAL_LOGGER as logger, configure_uvicorn_access_log +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger, configure_uvicorn_access_log from dbt_server.state import LAST_PARSED from dbt_server.exceptions import StateNotFoundException diff --git a/dbt_server/services/dbt_service.py b/dbt_server/services/dbt_service.py index bc69b32..11108ed 100644 --- a/dbt_server/services/dbt_service.py +++ b/dbt_server/services/dbt_service.py @@ -36,7 +36,7 @@ # dbt Server imports from dbt_server.services import filesystem_service from dbt_server import tracer -from dbt_server.logging import GLOBAL_LOGGER as logger +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger from dbt_server.exceptions import ( InvalidConfigurationException, diff --git a/dbt_server/services/filesystem_service.py b/dbt_server/services/filesystem_service.py index 80c0b50..587a991 100644 --- a/dbt_server/services/filesystem_service.py +++ b/dbt_server/services/filesystem_service.py @@ -1,6 +1,6 @@ import os import shutil -from dbt_server.logging import GLOBAL_LOGGER as logger +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger from dbt_server.exceptions import StateNotFoundException from dbt_server import tracer diff --git a/dbt_server/services/task_service.py b/dbt_server/services/task_service.py index 1326d99..f12f36a 100644 --- a/dbt_server/services/task_service.py +++ b/dbt_server/services/task_service.py @@ -3,7 +3,7 @@ from dbt_server import crud, schemas from dbt_server.services import dbt_service, filesystem_service -from dbt_server.logging import GLOBAL_LOGGER as logger, LogManager, ServerLog +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger, LogManager, ServerLog from dbt_server.models import TaskState from fastapi import HTTPException diff --git a/dbt_server/state.py b/dbt_server/state.py index cf600b3..b46c346 100644 --- a/dbt_server/state.py +++ b/dbt_server/state.py @@ -1,6 +1,6 @@ from dbt_server.services import filesystem_service, dbt_service from dbt_server.exceptions import StateNotFoundException -from dbt_server.logging import GLOBAL_LOGGER as logger +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger from dbt_server import tracer from dataclasses import dataclass diff --git a/dbt_server/views.py b/dbt_server/views.py index 1c32d0c..4b0b8a2 100644 --- a/dbt_server/views.py +++ b/dbt_server/views.py @@ -28,7 +28,7 @@ InternalException, StateNotFoundException, ) -from dbt_server.logging import GLOBAL_LOGGER as logger +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger # ORM stuff from sqlalchemy.orm import Session @@ -351,6 +351,7 @@ async def run_models_async( response_model=schemas.Task, db: Session = Depends(crud.get_db), ): + logger.debug("Received run request") return task_service.run_async(background_tasks, db, args) From ac06e1195a9838795316b2f39873187042ba6622 Mon Sep 17 00:00:00 2001 From: Rachel Daniel Date: Wed, 7 Dec 2022 11:10:16 -0600 Subject: [PATCH 5/5] Removes outdated comment --- dbt_server/services/task_service.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbt_server/services/task_service.py b/dbt_server/services/task_service.py index f12f36a..7ff074a 100644 --- a/dbt_server/services/task_service.py +++ b/dbt_server/services/task_service.py @@ -20,8 +20,6 @@ def run_task(task_name, task_id, args, db): log_manager = LogManager(log_path) - # TODO: Structured logging doesn't have the concept of custom log lines like this, - # need to follow up with core about a way to do this logger.info(f"Running dbt ({task_id}) - deserializing manifest {serialize_path}") manifest = dbt_service.deserialize_manifest(serialize_path)