Skip to content

Commit

Permalink
Ct 1191 event history cleanup (#5858)
Browse files Browse the repository at this point in the history
* Change Exceptions in events to strings. Refactor event_buffer_handling.

* Changie

* fix fire_event call MainEncounteredError

* Set EventBufferFull message when event buffer >= 10,000
  • Loading branch information
gshank authored Sep 20, 2022
1 parent 646a0c7 commit 7b1d61c
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 87 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20220916-091723.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Prevent event_history from holding references
time: 2022-09-16T09:17:23.273847-04:00
custom:
Author: gshank
Issue: "5848"
PR: "5858"
2 changes: 1 addition & 1 deletion core/dbt/adapters/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def load_plugin(self, name: str) -> Type[Credentials]:
# if we failed to import the target module in particular, inform
# the user about it via a runtime error
if exc.name == "dbt.adapters." + name:
fire_event(AdapterImportError(exc=exc))
fire_event(AdapterImportError(exc=str(exc)))
raise RuntimeException(f"Could not find adapter type {name}!")
# otherwise, the error had to have come from some underlying
# library. Log the stack trace.
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/clients/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def write_file(path: str, contents: str = "") -> bool:
reason = "Path was possibly too long"
# all our hard work and the path was still too long. Log and
# continue.
fire_event(SystemCouldNotWrite(path=path, reason=reason, exc=exc))
fire_event(SystemCouldNotWrite(path=path, reason=reason, exc=str(exc)))
else:
raise
return True
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
static_parser: Optional[bool] = None
indirect_selection: Optional[str] = None
cache_selected_only: Optional[bool] = None
event_buffer_size: Optional[int] = None


@dataclass
Expand Down
42 changes: 20 additions & 22 deletions core/dbt/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from colorama import Style
import dbt.events.functions as this # don't worry I hate it too.
from dbt.events.base_types import NoStdOut, Event, NoFile, ShowException, Cache
from dbt.events.types import EventBufferFull, T_Event, MainReportVersion, EmptyLine
from dbt.events.types import T_Event, MainReportVersion, EmptyLine, EventBufferFull
import dbt.flags as flags
from dbt.constants import SECRET_ENV_PREFIX

Expand All @@ -22,24 +22,16 @@
from typing import Any, Dict, List, Optional, Union
from collections import deque

global LOG_VERSION
LOG_VERSION = 2

# create the global event history buffer with the default max size (10k)
# python 3.7 doesn't support type hints on globals, but mypy requires them. hence the ignore.
# TODO the flags module has not yet been resolved when this is created
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore
EVENT_HISTORY = None

# create the global file logger with no configuration
global FILE_LOG
FILE_LOG = logging.getLogger("default_file")
null_handler = logging.NullHandler()
FILE_LOG.addHandler(null_handler)

# set up logger to go to stdout with defaults
# setup_event_logger will be called once args have been parsed
global STDOUT_LOG
STDOUT_LOG = logging.getLogger("default_stdout")
STDOUT_LOG.setLevel(logging.INFO)
stdout_handler = logging.StreamHandler(sys.stdout)
Expand All @@ -52,10 +44,6 @@


def setup_event_logger(log_path, level_override=None):
# flags have been resolved, and log_path is known
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE) # type: ignore

make_log_dir_if_missing(log_path)

this.format_json = flags.LOG_FORMAT == "json"
Expand Down Expand Up @@ -271,14 +259,7 @@ def fire_event(e: Event) -> None:
if isinstance(e, Cache) and not flags.LOG_CACHE_EVENTS:
return

# if and only if the event history deque will be completely filled by this event
# fire warning that old events are now being dropped
global EVENT_HISTORY
if len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
EVENT_HISTORY.append(e)
fire_event(EventBufferFull())
else:
EVENT_HISTORY.append(e)
add_to_event_history(e)

# backwards compatibility for plugins that require old logger (dbt-rpc)
if flags.ENABLE_LEGACY_LOGGER:
Expand Down Expand Up @@ -344,3 +325,20 @@ def get_ts_rfc3339() -> str:
ts = get_ts()
ts_rfc3339 = ts.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
return ts_rfc3339


def add_to_event_history(event):
if flags.EVENT_BUFFER_SIZE == 0:
return
global EVENT_HISTORY
if EVENT_HISTORY is None:
reset_event_history()
EVENT_HISTORY.append(event)
# We only set the EventBufferFull message for event buffers >= 10,000
if flags.EVENT_BUFFER_SIZE >= 10000 and len(EVENT_HISTORY) == (flags.EVENT_BUFFER_SIZE - 1):
fire_event(EventBufferFull())


def reset_event_history():
global EVENT_HISTORY
EVENT_HISTORY = deque(maxlen=flags.EVENT_BUFFER_SIZE)
60 changes: 30 additions & 30 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ def message(self) -> str:

@dataclass
class MainEncounteredError(ErrorLevel):
e: BaseException
exc: str
code: str = "Z002"

def message(self) -> str:
return f"Encountered an error:\n{self.e}"
return f"Encountered an error:\n{self.exc}"


@dataclass
Expand Down Expand Up @@ -382,7 +382,7 @@ def message(self) -> str:
class SystemCouldNotWrite(DebugLevel):
path: str
reason: str
exc: Exception
exc: str
code: str = "Z005"

def message(self) -> str:
Expand Down Expand Up @@ -762,7 +762,7 @@ def message(self) -> str:

@dataclass
class AdapterImportError(InfoLevel):
exc: Exception
exc: str
code: str = "E035"

def message(self) -> str:
Expand Down Expand Up @@ -1008,7 +1008,7 @@ def message(self) -> str:
@dataclass
class ParsedFileLoadFailed(ShowException, DebugLevel):
path: str
exc: Exception
exc: str
code: str = "I029"

def message(self) -> str:
Expand Down Expand Up @@ -1223,7 +1223,7 @@ def message(self) -> str:

@dataclass
class RunningOperationCaughtError(ErrorLevel):
exc: Exception
exc: str
code: str = "Q001"

def message(self) -> str:
Expand All @@ -1232,7 +1232,7 @@ def message(self) -> str:

@dataclass
class RunningOperationUncaughtError(ErrorLevel):
exc: Exception
exc: str
code: str = "FF01"

def message(self) -> str:
Expand All @@ -1249,7 +1249,7 @@ def message(self) -> str:

@dataclass
class DbtProjectErrorException(ErrorLevel):
exc: Exception
exc: str
code: str = "A010"

def message(self) -> str:
Expand All @@ -1266,7 +1266,7 @@ def message(self) -> str:

@dataclass
class DbtProfileErrorException(ErrorLevel):
exc: Exception
exc: str
code: str = "A012"

def message(self) -> str:
Expand Down Expand Up @@ -1313,7 +1313,7 @@ def message(self) -> str:

@dataclass
class CatchableExceptionOnRun(ShowException, DebugLevel):
exc: Exception
exc: str
code: str = "W002"

def message(self) -> str:
Expand All @@ -1323,7 +1323,7 @@ def message(self) -> str:
@dataclass
class InternalExceptionOnRun(DebugLevel):
build_path: str
exc: Exception
exc: str
code: str = "W003"

def message(self) -> str:
Expand Down Expand Up @@ -1352,7 +1352,7 @@ def message(self) -> str:
class GenericExceptionOnRun(ErrorLevel):
build_path: Optional[str]
unique_id: str
exc: Exception
exc: str
code: str = "W004"

def message(self) -> str:
Expand All @@ -1366,7 +1366,7 @@ def message(self) -> str:
@dataclass
class NodeConnectionReleaseError(ShowException, DebugLevel):
node_name: str
exc: Exception
exc: str
code: str = "W005"

def message(self) -> str:
Expand Down Expand Up @@ -1700,7 +1700,7 @@ def message(self) -> str:

@dataclass
class SQlRunnerException(ShowException, DebugLevel):
exc: Exception
exc: str
code: str = "Q006"

def message(self) -> str:
Expand Down Expand Up @@ -2458,7 +2458,7 @@ def message(self) -> str:

@dataclass
class GeneralWarningException(WarnLevel):
exc: Exception
exc: str
log_fmt: str
code: str = "Z047"

Expand All @@ -2479,7 +2479,7 @@ def message(self) -> str:

@dataclass
class RecordRetryException(DebugLevel):
exc: Exception
exc: str
code: str = "M021"

def message(self) -> str:
Expand All @@ -2495,7 +2495,7 @@ def message(self) -> str:
if 1 == 0:
MainReportVersion(v="")
MainKeyboardInterrupt()
MainEncounteredError(e=BaseException(""))
MainEncounteredError(exc="")
MainStackTrace(stack_trace="")
MainTrackingUserState(user_state="")
ParsingStart()
Expand Down Expand Up @@ -2524,7 +2524,7 @@ def message(self) -> str:
RegistryResponseMissingNestedKeys(response=""),
RegistryResponseExtraNestedKeys(response=""),
SystemErrorRetrievingModTime(path="")
SystemCouldNotWrite(path="", reason="", exc=Exception(""))
SystemCouldNotWrite(path="", reason="", exc="")
SystemExecutingCmd(cmd=[""])
SystemStdOutMsg(bmsg=b"")
SystemStdErrMsg(bmsg=b"")
Expand Down Expand Up @@ -2580,7 +2580,7 @@ def message(self) -> str:
DumpAfterAddGraph(Lazy.defer(lambda: dict()))
DumpBeforeRenameSchema(Lazy.defer(lambda: dict()))
DumpAfterRenameSchema(Lazy.defer(lambda: dict()))
AdapterImportError(exc=Exception())
AdapterImportError(exc="")
PluginLoadError()
SystemReportReturnCode(returncode=0)
NewConnectionOpening(connection_state="")
Expand All @@ -2603,7 +2603,7 @@ def message(self) -> str:
PartialParsingFailedBecauseNewProjectDependency()
PartialParsingFailedBecauseHashChanged()
PartialParsingDeletedMetric(id="")
ParsedFileLoadFailed(path="", exc=Exception(""))
ParsedFileLoadFailed(path="", exc="")
PartialParseSaveFileNotFound()
StaticParserCausedJinjaRendering(path="")
UsingExperimentalParser(path="")
Expand All @@ -2626,20 +2626,20 @@ def message(self) -> str:
PartialParsingDeletedExposure(unique_id="")
InvalidDisabledSourceInTestNode(msg="")
InvalidRefInTestNode(msg="")
RunningOperationCaughtError(exc=Exception(""))
RunningOperationUncaughtError(exc=Exception(""))
RunningOperationCaughtError(exc="")
RunningOperationUncaughtError(exc="")
DbtProjectError()
DbtProjectErrorException(exc=Exception(""))
DbtProjectErrorException(exc="")
DbtProfileError()
DbtProfileErrorException(exc=Exception(""))
DbtProfileErrorException(exc="")
ProfileListTitle()
ListSingleProfile(profile="")
NoDefinedProfiles()
ProfileHelpMessage()
CatchableExceptionOnRun(exc=Exception(""))
InternalExceptionOnRun(build_path="", exc=Exception(""))
GenericExceptionOnRun(build_path="", unique_id="", exc=Exception(""))
NodeConnectionReleaseError(node_name="", exc=Exception(""))
CatchableExceptionOnRun(exc="")
InternalExceptionOnRun(build_path="", exc="")
GenericExceptionOnRun(build_path="", unique_id="", exc="")
NodeConnectionReleaseError(node_name="", exc="")
CheckCleanPath(path="")
ConfirmCleanPath(path="")
ProtectedCleanPath(path="")
Expand Down Expand Up @@ -2847,6 +2847,6 @@ def message(self) -> str:
TrackingInitializeFailure()
RetryExternalCall(attempt=0, max=0)
GeneralWarningMsg(msg="", log_fmt="")
GeneralWarningException(exc=Exception(""), log_fmt="")
GeneralWarningException(exc="", log_fmt="")
EventBufferFull()
RecordRetryException(exc=Exception(""))
RecordRetryException(exc="")
2 changes: 1 addition & 1 deletion core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ def warn_or_raise(exc, log_fmt=None):
if flags.WARN_ERROR:
raise exc
else:
fire_event(GeneralWarningException(exc=exc, log_fmt=log_fmt))
fire_event(GeneralWarningException(exc=str(exc), log_fmt=log_fmt))


def warn(msg, node=None):
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def main(args=None):
exit_code = e.code

except BaseException as e:
fire_event(MainEncounteredError(e=str(e)))
fire_event(MainEncounteredError(exc=str(e)))
fire_event(MainStackTrace(stack_trace=traceback.format_exc()))
exit_code = ExitCodes.UnhandledError.value

Expand Down Expand Up @@ -201,7 +201,7 @@ def track_run(task):
yield
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="ok")
except (NotImplementedException, FailedToConnectException) as e:
fire_event(MainEncounteredError(e=str(e)))
fire_event(MainEncounteredError(exc=str(e)))
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
except Exception:
dbt.tracking.track_invocation_end(config=task.config, args=task.args, result_type="error")
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ def read_manifest_for_partial_parse(self) -> Optional[Manifest]:
manifest.metadata.invocation_id = get_invocation_id()
return manifest
except Exception as exc:
fire_event(ParsedFileLoadFailed(path=path, exc=exc))
fire_event(ParsedFileLoadFailed(path=path, exc=str(exc)))
reparse_reason = ReparseReason.load_file_failure
else:
fire_event(PartialParseSaveFileNotFound())
Expand Down
Loading

0 comments on commit 7b1d61c

Please sign in to comment.