Skip to content

Commit

Permalink
fix some bugs
Browse files Browse the repository at this point in the history
rpc, ls, some import tweaks
  • Loading branch information
Jacob Beck committed Jul 15, 2020
1 parent 89443ea commit 97c59f3
Show file tree
Hide file tree
Showing 16 changed files with 86 additions and 48 deletions.
2 changes: 1 addition & 1 deletion core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
from dbt.contracts.graph.parsed import ParsedNode
from dbt.exceptions import dependency_not_found, InternalException
from dbt.graph.graph import Graph
from dbt.graph import Graph
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.node_types import NodeType
from dbt.utils import add_ephemeral_model_prefix, pluralize
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from multiprocessing.synchronize import Lock
from typing import (
Dict, List, Optional, Union, Mapping, MutableMapping, Any, Set, Tuple,
TypeVar, Callable, Iterable, Generic
TypeVar, Callable, Iterable, Generic, cast
)
from typing_extensions import Protocol
from uuid import UUID
Expand Down Expand Up @@ -622,7 +622,8 @@ def sync_update_node(
with self._lock:
existing = self.nodes[new_node.unique_id]
if getattr(existing, 'compiled', False):
return existing
# already compiled -> must be a NonSourceCompiledNode
return cast(NonSourceCompiledNode, existing)
_update_into(self.nodes, new_node)
return new_node

Expand Down
14 changes: 14 additions & 0 deletions core/dbt/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .selector_spec import ( # noqa: F401
SelectionUnion,
SelectionSpec,
SelectionIntersection,
SelectionDifference,
SelectionCriteria,
)
from .selector import ( # noqa: F401
ResourceTypeSelector,
NodeSelector,
)
from .cli import parse_difference # noqa: F401
from .queue import GraphQueue # noqa: F401
from .graph import Graph # noqa: F401
4 changes: 1 addition & 3 deletions core/dbt/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
)
import networkx as nx # type: ignore

from dbt.contracts.graph.manifest import Manifest
from dbt.exceptions import RuntimeException, InternalException
from dbt.node_types import NodeType
from dbt.exceptions import InternalException

# it would be nice to use a NewType for this, but that will cause problems with
# str interop, which dbt relies on implicilty all over.
Expand Down
56 changes: 42 additions & 14 deletions core/dbt/graph/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def alert_non_existence(raw_spec, nodes):


class InvalidSelectorError(Exception):
# this internal exception should never escape the module.
pass


Expand All @@ -53,10 +54,10 @@ def __init__(
self.manifest = manifest

@classmethod
def register_method(cls, name, selector: Type[SelectorMethod]):
cls.SELECTOR_METHODS[name] = selector
def register_method(cls, name: MethodName, method: Type[SelectorMethod]):
cls.SELECTOR_METHODS[name] = method

def get_selector(self, method: str) -> SelectorMethod:
def get_method(self, method: str) -> SelectorMethod:
if method in self.SELECTOR_METHODS:
cls: Type[SelectorMethod] = self.SELECTOR_METHODS[method]
return cls(self.manifest)
Expand All @@ -66,12 +67,21 @@ def get_selector(self, method: str) -> SelectorMethod:
def select_included(
self, included_nodes: Set[str], spec: SelectionCriteria,
) -> Set[str]:
selector = self.get_selector(spec.method)
return set(selector.search(included_nodes, spec.value))
"""Select the explicitly included nodes, using the given spec. Return
the selected set of unique IDs.
"""
method = self.get_method(spec.method)
return set(method.search(included_nodes, spec.value))

def get_nodes_from_criteria(
self, graph: Graph, spec: SelectionCriteria
) -> Set[str]:
"""Given a Graph, get all nodes specified by the spec.
- collect the directly included nodes
- find their specified relatives
- perform any selector-specific expansion
"""
nodes = graph.nodes()
try:
collected = self.select_included(nodes, spec)
Expand All @@ -83,14 +93,18 @@ def get_nodes_from_criteria(
)
return set()

specified = self.collect_models(spec, graph, collected)
collected.update(specified)
result = self.expand_selection(graph, collected)
extras = self.collect_specified_neighbors(spec, graph, collected)
result = self.expand_selection(graph, collected | extras)
return result

def collect_models(
def collect_specified_neighbors(
self, spec: SelectionCriteria, graph: Graph, selected: Set[UniqueId]
) -> Set[UniqueId]:
"""Given the set of models selected by the explicit part of the
selector (like "tag:foo"), apply the modifiers on the spec ("+"/"@").
Return the set of additional nodes that should be collected (which may
overlap with the selected set).
"""
additional: Set[UniqueId] = set()
if spec.select_childrens_parents:
additional.update(graph.select_childrens_parents(selected))
Expand All @@ -104,9 +118,13 @@ def collect_models(
)
return additional

def select_nodes(
self, graph: Graph, spec: SelectionSpec
) -> Set[str]:
def select_nodes(self, graph: Graph, spec: SelectionSpec) -> Set[str]:
"""Select the nodes in the graph according to the spec.
If the spec is a composite spec (a union, difference, or intersection),
recurse into its selections and combine them. If the spec is a concrete
selection criteria, resolve that using the given graph.
"""
if isinstance(spec, SelectionCriteria):
result = self.get_nodes_from_criteria(graph, spec)
else:
Expand All @@ -133,6 +151,9 @@ def node_is_match(
self,
node: Union[ParsedSourceDefinition, NonSourceNode],
) -> bool:
"""Determine if a node is a match for the selector. Non-match nodes
will be excluded from results during filtering.
"""
return True

def _is_match(self, unique_id: str) -> bool:
Expand All @@ -148,23 +169,30 @@ def _is_match(self, unique_id: str) -> bool:
return self.node_is_match(node)

def build_graph_member_subgraph(self) -> Graph:
"""Build a subgraph of all enabled, non-empty nodes based on the full
graph.
"""
graph_members = {
unique_id for unique_id in self.full_graph.nodes()
if self._is_graph_member(unique_id)
}
return self.full_graph.subgraph(graph_members)

def filter_selection(self, selected: Set[str]) -> Set[str]:
"""Return the subset of selected nodes that is a match for this
selector.
"""
return {
unique_id for unique_id in selected if self._is_match(unique_id)
}

def expand_selection(
self, filtered_graph: Graph, selected: Set[str]
) -> Set[str]:
"""Perform selector-specific expansion."""
return selected

def get_selected(self, spec: SelectionCriteria) -> Set[str]:
def get_selected(self, spec: SelectionSpec) -> Set[str]:
"""get_selected runs trhough the node selection process:
- build a subgraph containing only non-empty, enabled nodes and
Expand All @@ -183,7 +211,7 @@ def get_selected(self, spec: SelectionCriteria) -> Set[str]:
filtered_nodes = self.filter_selection(selected_nodes)
return filtered_nodes

def get_graph_queue(self, spec: SelectionCriteria) -> GraphQueue:
def get_graph_queue(self, spec: SelectionSpec) -> GraphQueue:
"""Returns a queue over nodes in the graph that tracks progress of
dependecies.
"""
Expand Down
7 changes: 2 additions & 5 deletions core/dbt/task/compile.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from typing import List

from .runnable import GraphRunnableTask
from .base import BaseRunner

from dbt.compilation import compile_node
from dbt.contracts.results import RunModelResult
from dbt.exceptions import InternalException
from dbt.graph.selector import ResourceTypeSelector
from dbt.graph.cli import parse_difference
from dbt.graph import ResourceTypeSelector, SelectionSpec, parse_difference
from dbt.logger import print_timestamped_line
from dbt.node_types import NodeType

Expand All @@ -30,7 +27,7 @@ class CompileTask(GraphRunnableTask):
def raise_on_first_error(self):
return True

def get_selection_spec(self) -> List[str]:
def get_selection_spec(self) -> SelectionSpec:
spec = parse_difference(self.args.models, self.args.exclude)
return spec

Expand Down
7 changes: 3 additions & 4 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import Dict, List
from typing import Dict

from .base import BaseRunner
from .printer import (
Expand All @@ -22,8 +22,7 @@

from dbt import utils

from dbt.graph.selector import NodeSelector
from dbt.graph.cli import parse_difference
from dbt.graph import NodeSelector, SelectionSpec, parse_difference
from dbt.contracts.graph.parsed import ParsedSourceDefinition


Expand Down Expand Up @@ -125,7 +124,7 @@ def result_path(self):
def raise_on_first_error(self):
return False

def get_selection_spec(self) -> List[str]:
def get_selection_spec(self) -> SelectionSpec:
include = [
'source:{}'.format(s)
for s in (self.args.selected or ['*'])
Expand Down
10 changes: 5 additions & 5 deletions core/dbt/task/list.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import json
from typing import Type, List
from typing import Type

from dbt.graph.selector import ResourceTypeSelector
from dbt.graph.cli import parse_difference
from dbt.graph import ResourceTypeSelector, parse_difference, SelectionSpec
from dbt.task.runnable import GraphRunnableTask, ManifestTask
from dbt.task.test import TestSelector
from dbt.node_types import NodeType
Expand Down Expand Up @@ -54,7 +53,8 @@ def pre_init_hook(cls, args):

def _iterate_selected_nodes(self):
selector = self.get_node_selector()
nodes = sorted(selector.get_selected())
spec = self.get_selection_spec()
nodes = sorted(selector.get_selected(spec))
if not nodes:
logger.warning('No nodes selected!')
return
Expand Down Expand Up @@ -144,7 +144,7 @@ def selector(self):
else:
return self.args.select

def get_selection_spec(self) -> List[str]:
def get_selection_spec(self) -> SelectionSpec:
spec = parse_difference(self.selector, self.args.exclude)
return spec

Expand Down
6 changes: 6 additions & 0 deletions core/dbt/task/rpc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ def handle_request(self) -> Result:
# we parsed args from the cli, so we're set on that front
return self.real_task.handle_request()

def get_selection_spec(self):
return self.real_task.get_selection_spec()

def get_node_selector(self):
return self.real_task.get_node_selector()

def interpret_results(self, results):
if self.real_task is None:
# I don't know what happened, but it was surely some flavor of
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/rpc/sql_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dbt import flags
from dbt.adapters.factory import get_adapter
from dbt.clients.jinja import extract_toplevel_blocks
from dbt.compilation import compile_manifest, compile_node
from dbt.compilation import compile_manifest
from dbt.config.runtime import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.parsed import ParsedRPCNode
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
DbtModelState,
print_timestamped_line,
)
from dbt.graph.selector import ResourceTypeSelector
from dbt.graph import ResourceTypeSelector
from dbt.hooks import get_hook_dict
from dbt.node_types import NodeType, RunHookType

Expand Down
6 changes: 2 additions & 4 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
RuntimeException,
FailFastException
)
from dbt.graph.graph import Graph
from dbt.graph.queue import GraphQueue
from dbt.graph.selector import NodeSelector
from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, Graph
from dbt.perf_utils import get_full_manifest

import dbt.exceptions
Expand Down Expand Up @@ -95,7 +93,7 @@ def index_offset(self, value: int) -> int:
return value

@abstractmethod
def get_selection_spec(self) -> List[str]:
def get_selection_spec(self) -> SelectionSpec:
raise NotImplementedException(
f'get_selection_spec not implemented for task {type(self)}'
)
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)

from dbt.exceptions import InternalException
from dbt.graph.selector import ResourceTypeSelector
from dbt.graph import ResourceTypeSelector
from dbt.logger import GLOBAL_LOGGER as logger, TextOnly
from dbt.node_types import NodeType

Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .printer import print_snapshot_result_line

from dbt.exceptions import InternalException
from dbt.graph.selector import ResourceTypeSelector
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType


Expand Down
6 changes: 2 additions & 4 deletions core/dbt/task/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from dbt.contracts.results import RunModelResult
from dbt.exceptions import raise_compiler_error, InternalException
from dbt.graph.selector import ResourceTypeSelector, Graph
from dbt.graph import ResourceTypeSelector, Graph
from dbt.node_types import NodeType, RunHookType
from dbt import flags

Expand Down Expand Up @@ -103,14 +103,12 @@ def after_execute(self, result):

class TestSelector(ResourceTypeSelector):
def __init__(
self, graph, manifest, include, exclude, data: bool, schema: bool
self, graph, manifest, data: bool, schema: bool
):
super().__init__(
graph=graph,
manifest=manifest,
resource_types=[NodeType.Test],
include=include,
exclude=exclude,
)
self.data = data
self.schema = schema
Expand Down
3 changes: 1 addition & 2 deletions test/unit/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
from dbt.contracts.graph.manifest import FilePath, SourceFile, FileHash, Manifest
from dbt.parser.results import ParseResult
from dbt.parser.base import BaseParser
from dbt.graph.selector import NodeSelector
from dbt.graph.cli import parse_difference
from dbt.graph import NodeSelector, parse_difference

try:
from queue import Empty
Expand Down

0 comments on commit 97c59f3

Please sign in to comment.