diff --git a/dbt_server/logging.py b/dbt_server/logging.py index 62e79b6..7822c80 100644 --- a/dbt_server/logging.py +++ b/dbt_server/logging.py @@ -1,26 +1,46 @@ -import io import json import logging +import uuid import os from dataclasses import dataclass from datetime import datetime from typing import Optional -import logbook -import logbook.queues - -from dbt.events.functions import STDOUT_LOG, FILE_LOG -import dbt.logger as dbt_logger +from dbt.events.functions import EVENT_MANAGER +from dbt.events.eventmgr import LoggerConfig, LineFormat, EventLevel +from dbt.events.base_types import BaseEvent from pythonjsonlogger import jsonlogger +from dbt_server.services import filesystem_service +from dbt.events import AdapterLogger +from dbt.events.types import ( + AdapterEventDebug, + AdapterEventInfo, + AdapterEventWarning, + AdapterEventError, +) from dbt_server.models import TaskState +DBT_SERVER_EVENT_LOGGER = AdapterLogger("Server") +DBT_SERVER_EVENT_TYPES = [ + AdapterEventDebug, + AdapterEventInfo, + AdapterEventWarning, + AdapterEventError +] ACCOUNT_ID = os.environ.get("ACCOUNT_ID") ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID") WORKSPACE_ID = os.environ.get("WORKSPACE_ID") +dbt_event_to_python_root_log = { + EventLevel.DEBUG: logging.root.debug, + EventLevel.TEST: logging.root.debug, + EventLevel.INFO: logging.root.info, + EventLevel.WARN: logging.root.warn, + EventLevel.ERROR: logging.root.error, +} class CustomJsonFormatter(jsonlogger.JsonFormatter): def add_fields(self, log_record, record, message_dict): @@ -39,9 +59,9 @@ def add_fields(self, log_record, record, message_dict): log_record["workspaceID"] = WORKSPACE_ID -# setup json logging +# setup json logging for stdout and datadog logger = logging.getLogger() -logger.setLevel(logging.INFO) +logger.setLevel(logging.DEBUG) stdout = logging.StreamHandler() if os.environ.get("APPLICATION_ENVIRONMENT") in ("dev", None): formatter = logging.Formatter( @@ -57,15 +77,7 @@ def add_fields(self, log_record, record, message_dict): ) stdout.setFormatter(formatter) logger.addHandler(stdout) -dbt_server_logger = logging.getLogger("dbt-server") -dbt_server_logger.setLevel(logging.DEBUG) -GLOBAL_LOGGER = dbt_server_logger -# remove handlers from these loggers, so -# that they propagate up to the root logger -# for json formatting -STDOUT_LOG.handlers = [] -FILE_LOG.handlers = [] # make sure uvicorn is deferring to the root # logger to format logs @@ -92,9 +104,19 @@ def configure_uvicorn_access_log(): ual.handlers = [] -json_formatter = dbt_logger.JsonFormatter(format_string=dbt_logger.STDOUT_LOG_FORMAT) +# Push event messages to root python logger for formatting +def log_event_to_console(event: BaseEvent): + logging_method = dbt_event_to_python_root_log[event.log_level()] + if type(event) not in DBT_SERVER_EVENT_TYPES and logging_method == logging.root.debug: + # Only log debug level for dbt-server logs + return + logging_method(event.info.msg) + + +EVENT_MANAGER.callbacks.append(log_event_to_console) +# TODO: This should be some type of event. We may also choose to send events for all task state updates. @dataclass class ServerLog: state: TaskState @@ -104,77 +126,23 @@ def to_json(self): return json.dumps(self.__dict__) +# TODO: Make this a contextmanager class LogManager(object): def __init__(self, log_path): - from dbt_server.services import filesystem_service - + self.name = str(uuid.uuid4()) self.log_path = log_path - filesystem_service.ensure_dir_exists(self.log_path) - - logs_redirect_handler = logbook.FileHandler( - filename=self.log_path, - level=logbook.DEBUG, - bubble=True, - # TODO : Do we want to filter these? - filter=self._dbt_logs_only_filter, + logger_config = LoggerConfig( + name=self.name, + line_format=LineFormat.Json, + level=EventLevel.INFO, + use_colors=True, + output_file_name=log_path, + # TODO: Add scrubber for secrets ) - - # Big hack? - logs_redirect_handler.formatter = json_formatter - - self.handlers = [ - logs_redirect_handler, - ] - - dbt_logger.log_manager.set_path(None) - - def _dbt_logs_only_filter(self, record, handler): - """ - DUPLICATE OF LogbookStepLogsStreamWriter._dbt_logs_only_filter - """ - return record.channel.split(".")[0] == "dbt" - - def setup_handlers(self): - logger.info("Setting up log handlers...") - - dbt_logger.log_manager.objects = [ - handler - for handler in dbt_logger.log_manager.objects - if type(handler) is not logbook.NullHandler - ] - - handlers = [logbook.NullHandler()] + self.handlers - - self.log_context = logbook.NestedSetup(handlers) - self.log_context.push_application() - - logger.info("Done setting up log handlers.") + EVENT_MANAGER.add_logger(logger_config) def cleanup(self): - self.log_context.pop_application() - - -class CapturingLogManager(LogManager): - def __init__(self, log_path): - super().__init__(log_path) - - self._stream = io.StringIO() - capture_handler = logbook.StreamHandler( - stream=self._stream, - level=logbook.DEBUG, - bubble=True, - filter=self._dbt_logs_only_filter, - ) - - capture_handler.formatter = json_formatter - - self.handlers += [capture_handler] + # TODO: verify that threading doesn't result in wonky list + EVENT_MANAGER.loggers = [log for log in EVENT_MANAGER.loggers if log.name != self.name] - def getLogs(self): - # Be a good citizen with the seek pos - pos = self._stream.tell() - self._stream.seek(0) - res = self._stream.read().split("\n") - self._stream.seek(pos) - return res diff --git a/dbt_server/server.py b/dbt_server/server.py index 7c78ad9..fcc7bd7 100644 --- a/dbt_server/server.py +++ b/dbt_server/server.py @@ -8,7 +8,7 @@ from dbt_server.database import engine from dbt_server.services import dbt_service, filesystem_service from dbt_server.views import app -from dbt_server.logging import GLOBAL_LOGGER as logger, configure_uvicorn_access_log +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger, configure_uvicorn_access_log from dbt_server.state import LAST_PARSED from dbt_server.exceptions import StateNotFoundException diff --git a/dbt_server/services/dbt_service.py b/dbt_server/services/dbt_service.py index bc69b32..11108ed 100644 --- a/dbt_server/services/dbt_service.py +++ b/dbt_server/services/dbt_service.py @@ -36,7 +36,7 @@ # dbt Server imports from dbt_server.services import filesystem_service from dbt_server import tracer -from dbt_server.logging import GLOBAL_LOGGER as logger +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger from dbt_server.exceptions import ( InvalidConfigurationException, diff --git a/dbt_server/services/filesystem_service.py b/dbt_server/services/filesystem_service.py index 80c0b50..587a991 100644 --- a/dbt_server/services/filesystem_service.py +++ b/dbt_server/services/filesystem_service.py @@ -1,6 +1,6 @@ import os import shutil -from dbt_server.logging import GLOBAL_LOGGER as logger +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger from dbt_server.exceptions import StateNotFoundException from dbt_server import tracer diff --git a/dbt_server/services/logs_service.py b/dbt_server/services/logs_service.py deleted file mode 100644 index 5c856c1..0000000 --- a/dbt_server/services/logs_service.py +++ /dev/null @@ -1,18 +0,0 @@ -from dbt_server.logging import CapturingLogManager -from contextlib import contextmanager - - -@contextmanager -def capture_logs(log_path, logs): - """ - This captures logs from the yielded (presumably dbt) - invocation and appends them to the `logs` arg - """ - log_manager = CapturingLogManager(log_path) - log_manager.setup_handlers() - try: - yield - - finally: - logs += log_manager.getLogs() - log_manager.cleanup() diff --git a/dbt_server/services/task_service.py b/dbt_server/services/task_service.py index 792b8f2..7ff074a 100644 --- a/dbt_server/services/task_service.py +++ b/dbt_server/services/task_service.py @@ -3,7 +3,7 @@ from dbt_server import crud, schemas from dbt_server.services import dbt_service, filesystem_service -from dbt_server.logging import GLOBAL_LOGGER as logger, LogManager, ServerLog +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger, LogManager, ServerLog from dbt_server.models import TaskState from fastapi import HTTPException @@ -19,7 +19,6 @@ def run_task(task_name, task_id, args, db): log_path = filesystem_service.get_path(args.state_id, task_id, "logs.stdout") log_manager = LogManager(log_path) - log_manager.setup_handlers() logger.info(f"Running dbt ({task_id}) - deserializing manifest {serialize_path}") @@ -46,6 +45,7 @@ def run_task(task_name, task_id, args, db): raise RuntimeException("Not an actual task") except RuntimeException as e: crud.set_task_errored(db, db_task, str(e)) + log_manager.cleanup() raise e logger.info(f"Running dbt ({task_id}) - done") diff --git a/dbt_server/state.py b/dbt_server/state.py index cf600b3..b46c346 100644 --- a/dbt_server/state.py +++ b/dbt_server/state.py @@ -1,6 +1,6 @@ from dbt_server.services import filesystem_service, dbt_service from dbt_server.exceptions import StateNotFoundException -from dbt_server.logging import GLOBAL_LOGGER as logger +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger from dbt_server import tracer from dataclasses import dataclass diff --git a/dbt_server/views.py b/dbt_server/views.py index 94afe16..4b0b8a2 100644 --- a/dbt_server/views.py +++ b/dbt_server/views.py @@ -28,7 +28,7 @@ InternalException, StateNotFoundException, ) -from dbt_server.logging import GLOBAL_LOGGER as logger +from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger # ORM stuff from sqlalchemy.orm import Session @@ -157,10 +157,10 @@ class ListArgs(BaseModel): exclude: Union[None, str, List[str]] = None select: Union[None, str, List[str]] = None selector_name: Optional[str] = None - output: Optional[str] = "" + output: Optional[str] = "name" output_keys: Union[None, str, List[str]] = None state: Optional[str] = None - indirect_selection: str = "" + indirect_selection: str = "eager" class SnapshotArgs(BaseModel): @@ -312,9 +312,7 @@ async def run_models(args: RunArgs): manifest = dbt_service.deserialize_manifest(serialize_path) results = dbt_service.dbt_run(path, args, manifest) - - encoded_results = jsonable_encoder(results) - + encoded_results = jsonable_encoder(results.to_dict()) return JSONResponse( status_code=200, content={ @@ -353,6 +351,7 @@ async def run_models_async( response_model=schemas.Task, db: Session = Depends(crud.get_db), ): + logger.debug("Received run request") return task_service.run_async(background_tasks, db, args) @@ -413,7 +412,6 @@ async def preview_sql(sql: SQLConfig): compiled_code = helpers.extract_compiled_code_from_node(result) tag_request_span(state) - return JSONResponse( status_code=200, content={