Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest): Refactor structured logging to support infos, warnings, and failures structured reporting to UI #10828

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4a637b0
Adding structured log reporting to ingestion framework:
Jul 2, 2024
856731c
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 2, 2024
5bf5d73
Adding final reporting method support
Jul 2, 2024
5461cff
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 2, 2024
7d580e5
Yeah
Jul 2, 2024
fd7357a
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 2, 2024
b8e5382
Adding refactoring
Jul 2, 2024
b5bfe6c
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 2, 2024
bd4b3ff
Adding title, making literalstring requirement
Jul 2, 2024
47445c8
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 2, 2024
e897ede
type -> title
hsheth2 Jul 2, 2024
a667cf8
Fix final occurrences of type
Jul 2, 2024
da0739e
Adding prettier and supporting new log fields from ingest
Jul 2, 2024
023ad85
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 2, 2024
839389b
add structured logs type
hsheth2 Jul 3, 2024
6504bd4
Merge branch 'jj--add-structured-logging-to-ingestion' of ssh://githu…
hsheth2 Jul 3, 2024
9cd2035
Test failures pause
Jul 3, 2024
41757e6
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
35a8bdc
fix logs
hsheth2 Jul 3, 2024
fb47657
Adding test fixes
Jul 3, 2024
17d1ddb
Fixing json
Jul 3, 2024
41d817e
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
99e7f14
Adding frontend handling
Jul 3, 2024
a31173c
fix lossy list in report
hsheth2 Jul 3, 2024
7b08229
Merge branch 'jj--add-structured-logging-to-ingestion' of ssh://githu…
hsheth2 Jul 3, 2024
3f4c9b2
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
3c18e1a
fix failure reporting bug
hsheth2 Jul 3, 2024
c2f2aff
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
662c6d0
Final touches to make things work
Jul 3, 2024
d72da4a
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
2dfef8d
Fix mode tests
Jul 3, 2024
6547a5a
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
245aa4b
Redshift to DataHub
Jul 3, 2024
d6650a2
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
a1adbe9
Addressing comments
Jul 3, 2024
aabe311
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
0eaa0a9
Adding source utils
Jul 3, 2024
01d0263
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
cadc4f6
Fix the build
Jul 3, 2024
3ac4b42
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 3, 2024
49d3774
frontend lint
Jul 4, 2024
8fc1b39
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Jul 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 188 additions & 19 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)

from pydantic import BaseModel
from typing_extensions import LiteralString

from datahub.configuration.common import ConfigModel
from datahub.configuration.source_common import PlatformInstanceConfigMixin
Expand Down Expand Up @@ -62,6 +63,21 @@ class SourceCapability(Enum):
CLASSIFICATION = "Classification"


class StructuredLogLevel(Enum):
INFO = "INFO"
WARN = "WARN"
ERROR = "ERROR"


@dataclass
class StructuredLog(Report):
level: StructuredLogLevel
title: Optional[str]
message: Optional[str]
context: LossyList[str]
stacktrace: Optional[str] = None


@dataclass
class SourceReport(Report):
events_produced: int = 0
Expand All @@ -76,8 +92,35 @@ class SourceReport(Report):
default_factory=lambda: defaultdict(lambda: defaultdict(LossyList))
)

warnings: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict)
failures: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict)
# Underlying Lossy Dicts to Capture Errors, Warnings, and Infos.
_errors: LossyDict[str, StructuredLog] = field(
default_factory=lambda: LossyDict(10)
)
_warnings: LossyDict[str, StructuredLog] = field(
default_factory=lambda: LossyDict(10)
)
_infos: LossyDict[str, StructuredLog] = field(default_factory=lambda: LossyDict(10))

@property
def warnings(self) -> LossyList[StructuredLog]:
result: LossyList[StructuredLog] = LossyList()
for log in self._warnings.values():
result.append(log)
return result

@property
def failures(self) -> LossyList[StructuredLog]:
result: LossyList[StructuredLog] = LossyList()
for log in self._errors.values():
result.append(log)
return result

@property
def infos(self) -> LossyList[StructuredLog]:
result: LossyList[StructuredLog] = LossyList()
for log in self._infos.values():
result.append(log)
return result

def report_workunit(self, wu: WorkUnit) -> None:
self.events_produced += 1
Expand Down Expand Up @@ -109,28 +152,154 @@ def report_workunit(self, wu: WorkUnit) -> None:
"fineGrainedLineages"
].append(urn)

def report_warning(self, key: str, reason: str) -> None:
warnings = self.warnings.get(key, LossyList())
warnings.append(reason)
self.warnings[key] = warnings

def warning(self, key: str, reason: str) -> None:
self.report_warning(key, reason)
logger.warning(f"{key} => {reason}", stacklevel=2)

def report_failure(self, key: str, reason: str) -> None:
failures = self.failures.get(key, LossyList())
failures.append(reason)
self.failures[key] = failures

def failure(self, key: str, reason: str) -> None:
self.report_failure(key, reason)
logger.error(f"{key} => {reason}", stacklevel=2)
def report_warning(
self,
title: Optional[LiteralString],
message: LiteralString,
context: Optional[str] = None,
stacktrace: Optional[str] = None,
) -> None:
"""
Report a user-facing warning for the ingestion run.

Parameters
----------
title : Optional[str]
The WHAT: The type or category of the warning. This will be used for displaying the title of the warning.
message : str
The WHY: The message describing the why the warning was raised. This will used for displaying the subtitle or description of the warning.
context : Optional[str], optional
The WHERE + HOW: Additional context for the warning, by default None.
stacktrace : Optional[str], optional
Additional technical details about the failure used for debugging
"""
log_key = f"{title}-{message}"
if log_key not in self._warnings:
context_list: LossyList[str] = LossyList()
if context is not None:
context_list.append(context)
self._warnings[log_key] = StructuredLog(
level=StructuredLogLevel.WARN,
title=title,
message=message,
context=context_list,
stacktrace=stacktrace,
)
else:
if context is not None:
self._warnings[log_key].context.append(context)

def warning(
self,
title: Optional[LiteralString],
message: LiteralString,
context: Optional[str] = None,
stacktrace: Optional[str] = None,
) -> None:
self.report_warning(title, message, context, stacktrace)
logger.warning(f"{message} => {context}", stacklevel=2)

def report_failure(
self,
title: Optional[LiteralString],
message: LiteralString,
context: Optional[str] = None,
stacktrace: Optional[str] = None,
) -> None:
"""
Report a user-facing error for the ingestion run.

Parameters
----------
title : Optional[str]
The WHAT: The type of the error. This will be used for displaying the title of the error.
message : str
The WHY: The message describing the why the error was raised. This will used for displaying the subtitle or description of the error.
context : Optional[str], optional
The WHERE + HOW: Additional context for the error, by default None.
stacktrace : Optional[str], optional
Additional technical details about the failure used for debugging
"""
log_key = f"{title}-{message}"
if log_key not in self._errors:
context_list: LossyList[str] = LossyList()
if context is not None:
context_list.append(context)
self._errors[log_key] = StructuredLog(
level=StructuredLogLevel.ERROR,
type=title,
message=message,
context=context_list,
stacktrace=stacktrace,
)
else:
if context is not None:
self._errors[log_key].context.append(context)

def failure(
self,
title: Optional[LiteralString],
message: LiteralString,
context: Optional[str] = None,
stacktrace: Optional[str] = None,
) -> None:
self.report_failure(title, message, context, stacktrace)
logger.error(f"{message} => {context}", stacklevel=2)

def report_info(
self,
title: Optional[LiteralString],
message: LiteralString,
context: Optional[str] = None,
) -> None:
"""
Report a user-facing info log for the ingestion run.

Parameters
----------
title : Optional[str]
The WHAT: The type of the info log. This will be used for displaying the title of the info log.
message : str
The WHY: The message describing the information. This will used for displaying the subtitle or description of the error.
context : Optional[str], optional
The WHERE + HOW: Additional context for the info, by default None.
"""
log_key = f"{title}-{message}"
if log_key not in self._infos:
context_list: LossyList[str] = LossyList()
if context is not None:
context_list.append(context)
self._infos[log_key] = StructuredLog(
level=StructuredLogLevel.INFO,
title=title,
message=message,
context=context_list,
)
else:
if context is not None:
self._infos[log_key].context.append(context)

def info(
self,
title: Optional[LiteralString],
message: LiteralString,
context: Optional[str] = None,
) -> None:
self.report_info(title, message, context)
logger.info(f"{message} => {context}", stacklevel=2)

def __post_init__(self) -> None:
self.start_time = datetime.datetime.now()
self.running_time: datetime.timedelta = datetime.timedelta(seconds=0)

def as_obj(self) -> dict:
base_obj = super().as_obj()
# Materialize Properties for Report Object
base_obj["infos"] = Report.to_pure_python_obj(self.infos)
base_obj["failures"] = Report.to_pure_python_obj(self.failures)
base_obj["warnings"] = Report.to_pure_python_obj(self.warnings)
return base_obj

def compute_stats(self) -> None:
duration = datetime.datetime.now() - self.start_time
workunits_produced = self.events_produced
Expand Down
32 changes: 14 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
clear_global_warnings,
get_global_warnings,
)
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.lossy_collections import LossyList

logger = logging.getLogger(__name__)
_REPORT_PRINT_INTERVAL_SECONDS = 60
Expand Down Expand Up @@ -124,7 +124,7 @@ class PipelineInitError(Exception):
class PipelineStatus(enum.Enum):
UNKNOWN = enum.auto()
COMPLETED = enum.auto()
PIPELINE_ERROR = enum.auto()
ERROR = enum.auto()
CANCELLED = enum.auto()


Expand Down Expand Up @@ -508,16 +508,8 @@ def run(self) -> None:
logger.error("Caught error", exc_info=e)
raise
except Exception as exc:
self.final_status = PipelineStatus.PIPELINE_ERROR
logger.exception("Ingestion pipeline threw an uncaught exception")

# HACK: We'll report this as a source error, since we don't have a great place to put it.
# It theoretically could've come from any part of the pipeline, but usually it's from the source.
# This ensures that it is included in the report, and that the run is marked as failed.
self.source.get_report().report_failure(
"pipeline_error",
f"Ingestion pipeline threw an uncaught exception: {exc}",
)
self.final_status = PipelineStatus.ERROR
self._handle_uncaught_pipeline_exception(exc)
finally:
clear_global_warnings()

Expand Down Expand Up @@ -627,11 +619,8 @@ def log_ingestion_stats(self) -> None:
self.ctx.graph,
)

def _approx_all_vals(self, d: LossyDict[str, LossyList]) -> int:
result = d.dropped_keys_count()
for k in d:
result += len(d[k])
return result
def _approx_all_vals(self, d: LossyList[Any]) -> int:
return d.total_elements

def _get_text_color(self, running: bool, failures: bool, warnings: bool) -> str:
if running:
Expand All @@ -657,7 +646,7 @@ def pretty_print_summary(
if (
not workunits_produced
and not currently_running
and self.final_status == PipelineStatus.PIPELINE_ERROR
and self.final_status == PipelineStatus.ERROR
):
# If the pipeline threw an uncaught exception before doing anything, printing
# out the report would just be annoying.
Expand Down Expand Up @@ -725,6 +714,13 @@ def pretty_print_summary(
)
return 0

def _handle_uncaught_pipeline_exception(self, exc: Exception) -> None:
logger.exception("Ingestion pipeline threw an uncaught exception")
self.source.get_report().report_failure(
"pipeline_error",
f"Ingestion pipeline threw an uncaught exception: {exc}",
)

def _get_structured_report(self) -> Dict[str, Any]:
return {
"cli": self.cli_report.as_obj(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,9 @@ def construct_schema_metadata(
if schema_size > MAX_SCHEMA_SIZE:
# downsample the schema, using frequency as the sort key
self.report.report_warning(
key=dataset_urn,
reason=f"Downsampling the table schema because MAX_SCHEMA_SIZE threshold is {MAX_SCHEMA_SIZE}",
type="Schema Size Too Large",
message=f"Downsampling the table schema because MAX_SCHEMA_SIZE threshold is {MAX_SCHEMA_SIZE}",
context=f"Collection: {dataset_urn}",
)

# Add this information to the custom properties so user can know they are looking at down sampled schema
Expand Down Expand Up @@ -535,7 +536,9 @@ def get_native_type(self, attribute_type: Union[type, str], table_name: str) ->
)
if type_string is None:
self.report.report_warning(
table_name, f"unable to map type {attribute_type} to native data type"
type="Unable to Map Attribute Type",
message=f"Unable to map type {attribute_type} to native data type",
context=f"Collection: {table_name}",
)
return _attribute_type_to_native_type_mapping[attribute_type]
return type_string
Expand All @@ -550,8 +553,9 @@ def get_field_type(

if type_class is None:
self.report.report_warning(
table_name,
f"unable to map type {attribute_type} to metadata schema field type",
type="Unable to Map Field Type",
message=f"Unable to map type {attribute_type} to metadata schema field type",
context=f"Collection: {table_name}",
)
type_class = NullTypeClass
return SchemaFieldDataType(type=type_class())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,8 @@ def _get_tags_from_field_type(
)
else:
reporter.report_warning(
"lookml",
f"Failed to map view field type {field.field_type}. Won't emit tags for measure and dimension",
type="Failed to Map View Field Type",
message=f"Failed to map view field type {field.field_type}. Won't emit tags for measure and dimension",
)

if schema_field_tags:
Expand Down Expand Up @@ -835,8 +835,9 @@ def from_api( # noqa: C901
potential_views.append(view_name)
except AssertionError:
reporter.report_warning(
key=f"chart-field-{field_name}",
reason="The field was not prefixed by a view name. This can happen when the field references another dynamic field.",
type="Missing View Name",
message="The field was not prefixed by a view name. This can happen when the field references another dynamic field.",
context=view_name,
)
continue

Expand Down Expand Up @@ -982,8 +983,9 @@ def from_api( # noqa: C901

except AssertionError:
reporter.report_warning(
key="chart-",
reason="Was unable to find dependent views for this chart",
type="Unable to find Views",
message="Encountered exception while attempting to find dependent views for this chart",
context=f"Explore: {explore_name}, Mode: {model}, Views: {views}",
)
return None

Expand Down
Loading
Loading