Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Racheld/structured logging discovery #131

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 51 additions & 83 deletions dbt_server/logging.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,46 @@
import io
import json
import logging
import uuid
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

import logbook
import logbook.queues

from dbt.events.functions import STDOUT_LOG, FILE_LOG
import dbt.logger as dbt_logger
from dbt.events.functions import EVENT_MANAGER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterallenwebb we probably want to move EVENT_MANAGER into a contextvar for this kind of usecase? If we plan to have concurrent request being handled by the same dbt-server container

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChenyuLInx Thanks for opening dbt-labs/dbt-core#6399! Love to see it

from dbt.events.eventmgr import LoggerConfig, LineFormat, EventLevel
from dbt.events.base_types import BaseEvent
from pythonjsonlogger import jsonlogger
from dbt_server.services import filesystem_service

from dbt.events import AdapterLogger
from dbt.events.types import (
AdapterEventDebug,
AdapterEventInfo,
AdapterEventWarning,
AdapterEventError,
)

from dbt_server.models import TaskState

DBT_SERVER_EVENT_LOGGER = AdapterLogger("Server")
DBT_SERVER_EVENT_TYPES = [
AdapterEventDebug,
AdapterEventInfo,
AdapterEventWarning,
AdapterEventError
]

ACCOUNT_ID = os.environ.get("ACCOUNT_ID")
ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID")
WORKSPACE_ID = os.environ.get("WORKSPACE_ID")

dbt_event_to_python_root_log = {
EventLevel.DEBUG: logging.root.debug,
EventLevel.TEST: logging.root.debug,
EventLevel.INFO: logging.root.info,
EventLevel.WARN: logging.root.warn,
EventLevel.ERROR: logging.root.error,
}

class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
Expand All @@ -39,9 +59,9 @@ def add_fields(self, log_record, record, message_dict):
log_record["workspaceID"] = WORKSPACE_ID


# setup json logging
# setup json logging for stdout and datadog
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
stdout = logging.StreamHandler()
if os.environ.get("APPLICATION_ENVIRONMENT") in ("dev", None):
formatter = logging.Formatter(
Expand All @@ -57,15 +77,7 @@ def add_fields(self, log_record, record, message_dict):
)
stdout.setFormatter(formatter)
logger.addHandler(stdout)
dbt_server_logger = logging.getLogger("dbt-server")
dbt_server_logger.setLevel(logging.DEBUG)
GLOBAL_LOGGER = dbt_server_logger

# remove handlers from these loggers, so
# that they propagate up to the root logger
# for json formatting
STDOUT_LOG.handlers = []
FILE_LOG.handlers = []

# make sure uvicorn is deferring to the root
# logger to format logs
Expand All @@ -92,9 +104,19 @@ def configure_uvicorn_access_log():
ual.handlers = []


json_formatter = dbt_logger.JsonFormatter(format_string=dbt_logger.STDOUT_LOG_FORMAT)
# Push event messages to root python logger for formatting
def log_event_to_console(event: BaseEvent):
logging_method = dbt_event_to_python_root_log[event.log_level()]
if type(event) not in DBT_SERVER_EVENT_TYPES and logging_method == logging.root.debug:
# Only log debug level for dbt-server logs
return
logging_method(event.info.msg)


EVENT_MANAGER.callbacks.append(log_event_to_console)


# TODO: This should be some type of event. We may also choose to send events for all task state updates.
@dataclass
class ServerLog:
state: TaskState
Expand All @@ -104,77 +126,23 @@ def to_json(self):
return json.dumps(self.__dict__)


# TODO: Make this a contextmanager
class LogManager(object):
def __init__(self, log_path):
from dbt_server.services import filesystem_service

self.name = str(uuid.uuid4())
self.log_path = log_path

filesystem_service.ensure_dir_exists(self.log_path)

logs_redirect_handler = logbook.FileHandler(
filename=self.log_path,
level=logbook.DEBUG,
bubble=True,
# TODO : Do we want to filter these?
filter=self._dbt_logs_only_filter,
logger_config = LoggerConfig(
name=self.name,
line_format=LineFormat.Json,
level=EventLevel.INFO,
use_colors=True,
output_file_name=log_path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, We are writing the logs to a local file then stream-log endpoint gonna read that file and send SSE to WS controller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is currently how it's working! This is just bringing us to parity with how the server logging worked before, we aren't set on this implementation.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal here would be: finding a straightforward way to get this working for now, while preserving future options where we push events elsewhere / elsehow. Sounds like this approach fits the bill

# TODO: Add scrubber for secrets
)

# Big hack?
logs_redirect_handler.formatter = json_formatter

self.handlers = [
logs_redirect_handler,
]

dbt_logger.log_manager.set_path(None)

def _dbt_logs_only_filter(self, record, handler):
"""
DUPLICATE OF LogbookStepLogsStreamWriter._dbt_logs_only_filter
"""
return record.channel.split(".")[0] == "dbt"

def setup_handlers(self):
logger.info("Setting up log handlers...")

dbt_logger.log_manager.objects = [
handler
for handler in dbt_logger.log_manager.objects
if type(handler) is not logbook.NullHandler
]

handlers = [logbook.NullHandler()] + self.handlers

self.log_context = logbook.NestedSetup(handlers)
self.log_context.push_application()

logger.info("Done setting up log handlers.")
EVENT_MANAGER.add_logger(logger_config)

def cleanup(self):
self.log_context.pop_application()


class CapturingLogManager(LogManager):
def __init__(self, log_path):
super().__init__(log_path)

self._stream = io.StringIO()
capture_handler = logbook.StreamHandler(
stream=self._stream,
level=logbook.DEBUG,
bubble=True,
filter=self._dbt_logs_only_filter,
)

capture_handler.formatter = json_formatter

self.handlers += [capture_handler]
# TODO: verify that threading doesn't result in wonky list
EVENT_MANAGER.loggers = [log for log in EVENT_MANAGER.loggers if log.name != self.name]

def getLogs(self):
# Be a good citizen with the seek pos
pos = self._stream.tell()
self._stream.seek(0)
res = self._stream.read().split("\n")
self._stream.seek(pos)
return res
2 changes: 1 addition & 1 deletion dbt_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dbt_server.database import engine
from dbt_server.services import dbt_service, filesystem_service
from dbt_server.views import app
from dbt_server.logging import GLOBAL_LOGGER as logger, configure_uvicorn_access_log
from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger, configure_uvicorn_access_log
from dbt_server.state import LAST_PARSED
from dbt_server.exceptions import StateNotFoundException

Expand Down
2 changes: 1 addition & 1 deletion dbt_server/services/dbt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
# dbt Server imports
from dbt_server.services import filesystem_service
from dbt_server import tracer
from dbt_server.logging import GLOBAL_LOGGER as logger
from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger

from dbt_server.exceptions import (
InvalidConfigurationException,
Expand Down
2 changes: 1 addition & 1 deletion dbt_server/services/filesystem_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import shutil
from dbt_server.logging import GLOBAL_LOGGER as logger
from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger
from dbt_server.exceptions import StateNotFoundException
from dbt_server import tracer

Expand Down
18 changes: 0 additions & 18 deletions dbt_server/services/logs_service.py

This file was deleted.

4 changes: 2 additions & 2 deletions dbt_server/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dbt_server import crud, schemas
from dbt_server.services import dbt_service, filesystem_service
from dbt_server.logging import GLOBAL_LOGGER as logger, LogManager, ServerLog
from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger, LogManager, ServerLog
from dbt_server.models import TaskState

from fastapi import HTTPException
Expand All @@ -19,7 +19,6 @@ def run_task(task_name, task_id, args, db):
log_path = filesystem_service.get_path(args.state_id, task_id, "logs.stdout")

log_manager = LogManager(log_path)
log_manager.setup_handlers()

logger.info(f"Running dbt ({task_id}) - deserializing manifest {serialize_path}")

Expand All @@ -46,6 +45,7 @@ def run_task(task_name, task_id, args, db):
raise RuntimeException("Not an actual task")
except RuntimeException as e:
crud.set_task_errored(db, db_task, str(e))
log_manager.cleanup()
raise e

logger.info(f"Running dbt ({task_id}) - done")
Expand Down
2 changes: 1 addition & 1 deletion dbt_server/state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt_server.services import filesystem_service, dbt_service
from dbt_server.exceptions import StateNotFoundException
from dbt_server.logging import GLOBAL_LOGGER as logger
from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger
from dbt_server import tracer

from dataclasses import dataclass
Expand Down
12 changes: 5 additions & 7 deletions dbt_server/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
InternalException,
StateNotFoundException,
)
from dbt_server.logging import GLOBAL_LOGGER as logger
from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger

# ORM stuff
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -157,10 +157,10 @@ class ListArgs(BaseModel):
exclude: Union[None, str, List[str]] = None
select: Union[None, str, List[str]] = None
selector_name: Optional[str] = None
output: Optional[str] = ""
output: Optional[str] = "name"
output_keys: Union[None, str, List[str]] = None
state: Optional[str] = None
indirect_selection: str = ""
indirect_selection: str = "eager"


class SnapshotArgs(BaseModel):
Expand Down Expand Up @@ -312,9 +312,7 @@ async def run_models(args: RunArgs):

manifest = dbt_service.deserialize_manifest(serialize_path)
results = dbt_service.dbt_run(path, args, manifest)

encoded_results = jsonable_encoder(results)

encoded_results = jsonable_encoder(results.to_dict())
return JSONResponse(
status_code=200,
content={
Expand Down Expand Up @@ -353,6 +351,7 @@ async def run_models_async(
response_model=schemas.Task,
db: Session = Depends(crud.get_db),
):
logger.debug("Received run request")
return task_service.run_async(background_tasks, db, args)


Expand Down Expand Up @@ -413,7 +412,6 @@ async def preview_sql(sql: SQLConfig):
compiled_code = helpers.extract_compiled_code_from_node(result)

tag_request_span(state)

return JSONResponse(
status_code=200,
content={
Expand Down