Skip to content

Commit

Permalink
Merge pull request #316 from nodestream-proj/scope_logger
Browse files Browse the repository at this point in the history
Adding the scope to all log messages.
  • Loading branch information
zprobst authored Jun 11, 2024
2 parents 4daac75 + 6378814 commit 3052ff9
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 16 deletions.
4 changes: 3 additions & 1 deletion nodestream/cli/operations/initialize_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ def configure_logging_with_json_defaults():
def record_factory(*args, **kwargs):
record = old_record_factory(*args, **kwargs)
record.pipeline_name = get_context().name
record.pipeline_scope = get_context().scope
return record

logging.setLogRecordFactory(record_factory)

formatter = JsonFormatter(
"%(name)s %(levelname)s %(pipeline_name)s %(message)s", timestamp=True
"%(name)s %(levelname)s %(pipeline_name)s %(pipeline_scope)s %(message)s",
timestamp=True,
)
logger = logging.getLogger() # Configure the root logger.
logger.handlers[0].setFormatter(formatter)
Expand Down
6 changes: 4 additions & 2 deletions nodestream/pipeline/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from typing import Any, Dict

UNKNOWN_PIPELINE_NAME = "unknown"
UNKNOWN_PIPELINE_SCOPE = "unknown"


@dataclass(frozen=True, slots=True)
class PipelineContext:
name: str = UNKNOWN_PIPELINE_NAME
scope: str = UNKNOWN_PIPELINE_SCOPE
stats: Dict[str, Any] = field(default_factory=lambda: defaultdict(int))

def increment_stat(self, stat_name: str, amount: int = 1):
Expand All @@ -27,8 +29,8 @@ def get_context() -> PipelineContext:


@contextmanager
def start_context(pipeline_name: str):
token = context.set(PipelineContext(pipeline_name))
def start_context(pipeline_name: str, pipeline_scope: str):
token = context.set(PipelineContext(pipeline_name, pipeline_scope))
try:
yield
finally:
Expand Down
4 changes: 3 additions & 1 deletion nodestream/project/pipeline_scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Dict, Iterable, Optional

from ..file_io import LoadsFromYaml, SavesToYaml
from ..pipeline.meta import start_context
from ..pipeline.scope_config import ScopeConfig
from ..schema import ExpandsSchema, ExpandsSchemaFromChildren
from .pipeline_definition import PipelineConfiguration, PipelineDefinition
Expand Down Expand Up @@ -89,7 +90,8 @@ async def run_request(self, run_request: "RunRequest") -> int:
return 0

run_request.set_configuration(self.config)
await run_request.execute_with_definition(self[name])
with start_context(name, self.name):
await run_request.execute_with_definition(self[name])
return 1

def __getitem__(self, pipeline_name):
Expand Down
6 changes: 2 additions & 4 deletions nodestream/project/run_request.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from dataclasses import dataclass

from ..pipeline import PipelineInitializationArguments
from ..pipeline.meta import start_context
from ..pipeline.progress_reporter import PipelineProgressReporter
from ..pipeline.scope_config import ScopeConfig
from .pipeline_definition import PipelineDefinition
Expand Down Expand Up @@ -53,9 +52,8 @@ async def execute_with_definition(self, definition: PipelineDefinition):
Args:
definition: The pipeline definition to execute this run request with.
"""
with start_context(self.pipeline_name):
pipeline = definition.initialize(self.initialization_arguments)
await pipeline.run(self.progress_reporter)
pipeline = definition.initialize(self.initialization_arguments)
await pipeline.run(self.progress_reporter)

def set_configuration(self, scope_config: ScopeConfig):
self.initialization_arguments.effecitve_config_values = scope_config
4 changes: 3 additions & 1 deletion tests/unit/cli/operations/test_initialize_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ def test_logs_pipeline_name(capsys):
configure_logging_with_json_defaults()
logger = logging.getLogger("some")
pipeline_name = "test_pipeline_name"
with start_context(pipeline_name):
scope_name = "test_scope_name"
with start_context(pipeline_name, scope_name):
logger.info("some message")

captured_out = capsys.readouterr().err
assert_that(captured_out, contains_string(pipeline_name))
assert_that(captured_out, contains_string(scope_name))
8 changes: 4 additions & 4 deletions tests/unit/databases/test_query_executor_with_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def query_executor_with_statistics(mocker):
async def test_upsert_nodes_in_bulk_with_same_operation_increments_counter_by_size_of_list(
query_executor_with_statistics,
):
with start_context("test"):
with start_context("test", "test_scope"):
await query_executor_with_statistics.upsert_nodes_in_bulk_with_same_operation(
"operation", ["node1", "node2"]
)
Expand All @@ -34,7 +34,7 @@ async def test_upsert_nodes_in_bulk_with_same_operation_increments_counter_by_si
async def test_upsert_relationships_in_bulk_of_same_operation_increments_counter_by_size_of_list(
query_executor_with_statistics,
):
with start_context("test"):
with start_context("test", "test_scope"):
await query_executor_with_statistics.upsert_relationships_in_bulk_of_same_operation(
"operation", ["relationship1", "relationship2"]
)
Expand All @@ -47,7 +47,7 @@ async def test_upsert_relationships_in_bulk_of_same_operation_increments_counter

@pytest.mark.asyncio
async def test_perform_ttl_op_increments_counter_by_one(query_executor_with_statistics):
with start_context("test"):
with start_context("test", "test_scope"):
await query_executor_with_statistics.perform_ttl_op("config")
query_executor_with_statistics.inner.perform_ttl_op.assert_awaited_once_with(
"config"
Expand All @@ -57,7 +57,7 @@ async def test_perform_ttl_op_increments_counter_by_one(query_executor_with_stat

@pytest.mark.asyncio
async def test_execute_hook_increments_counter_by_one(query_executor_with_statistics):
with start_context("test"):
with start_context("test", "test_scope"):
await query_executor_with_statistics.execute_hook("hook")
query_executor_with_statistics.inner.execute_hook.assert_awaited_once_with(
"hook"
Expand Down
14 changes: 11 additions & 3 deletions tests/unit/pipeline/test_meta.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from hamcrest import assert_that, equal_to

from nodestream.pipeline.meta import UNKNOWN_PIPELINE_NAME, get_context, start_context
from nodestream.pipeline.meta import (
UNKNOWN_PIPELINE_NAME,
UNKNOWN_PIPELINE_SCOPE,
get_context,
start_context,
)


def test_get_pipeline_name_unset():
assert_that(get_context().name, equal_to(UNKNOWN_PIPELINE_NAME))
assert_that(get_context().scope, equal_to(UNKNOWN_PIPELINE_SCOPE))


def test_get_pipeline_name_set():
with start_context("test"):
def test_get_pipeline_name_and_scope_set():
with start_context("test", "test_scope"):
assert_that(get_context().name, equal_to("test"))
assert_that(get_context().scope, equal_to("test_scope"))
assert_that(get_context().name, equal_to(UNKNOWN_PIPELINE_NAME))
assert_that(get_context().scope, equal_to(UNKNOWN_PIPELINE_SCOPE))

0 comments on commit 3052ff9

Please sign in to comment.