Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(azuredevops): restranformate without api client (#8035) #8039

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/python/pydevlake/pydevlake/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_connection(self, _, connection: dict):
return self._plugin.test_connection(connection)

@plugin_method
def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connection: dict):
def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connection: dict,skip_collectors: bool):
connection = self._plugin.connection_type(**connection)
scope_config_pairs = [
(
Expand All @@ -102,7 +102,7 @@ def make_pipeline(self, _, scope_config_pairs: list[tuple[dict, dict]], connecti
)
for raw_scope, raw_config in scope_config_pairs
]
return self._plugin.make_pipeline(scope_config_pairs, connection)
return self._plugin.make_pipeline(scope_config_pairs, connection, skip_collectors)

@plugin_method
def plugin_info(self, _):
Expand Down
50 changes: 27 additions & 23 deletions backend/python/pydevlake/pydevlake/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@
# limitations under the License.


from typing import Type, Union, Iterable, Optional
from abc import ABC, abstractmethod
from pathlib import Path
import os
import sys
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Type, Union, Iterable, Optional

import fire

import pydevlake.message as msg
import pydevlake.model_info
from pydevlake.subtasks import Subtask
from pydevlake.logger import logger
from pydevlake.ipc import PluginCommands
from pydevlake.context import Context
from pydevlake.stream import Stream
from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig, raw_data_params
from pydevlake.ipc import PluginCommands
from pydevlake.logger import logger
from pydevlake.migration import MIGRATION_SCRIPTS

from pydevlake.model import ToolScope, DomainScope, Connection, ScopeConfig, raw_data_params
from pydevlake.stream import Stream
from pydevlake.subtasks import Subtask

ScopeConfigPair = tuple[ToolScope, ScopeConfig]

Expand Down Expand Up @@ -119,20 +118,20 @@ def make_remote_scopes(self, connection: Connection, group_id: Optional[str] = N
tool_scope.raw_data_params = raw_data_params(connection.id, tool_scope.id)
tool_scope.raw_data_table = self._raw_scope_table_name()
yield msg.RemoteScope(
id=tool_scope.id,
parent_id=group_id,
name=tool_scope.name,
data=tool_scope
)
id=tool_scope.id,
parent_id=group_id,
name=tool_scope.name,
data=tool_scope
)
else:
yield from self.remote_scope_groups(connection)

def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair],
connection: Connection) -> msg.PipelineData:
connection: Connection, skip_collectors: bool) -> msg.PipelineData:
"""
Make a simple pipeline using the scopes declared by the plugin.
"""
plan = self.make_pipeline_plan(scope_config_pairs, connection)
plan = self.make_pipeline_plan(scope_config_pairs, connection, skip_collectors)
domain_scopes = []
for tool_scope, _ in scope_config_pairs:
for scope in self.domain_scopes(tool_scope):
Expand All @@ -151,13 +150,14 @@ def make_pipeline(self, scope_config_pairs: list[ScopeConfigPair],
)

def make_pipeline_plan(self, scope_config_pairs: list[ScopeConfigPair],
connection: Connection) -> list[list[msg.PipelineTask]]:
connection: Connection, skip_collectors: bool) -> list[list[msg.PipelineTask]]:
"""
Generate a pipeline plan with one stage per scope, plus optional additional stages.
Redefine `extra_stages` to add stages at the end of this pipeline.
"""
return [
*(self.make_pipeline_stage(scope, config, connection) for scope, config in scope_config_pairs),
*(self.make_pipeline_stage(scope, config, connection, skip_collectors) for scope, config in
scope_config_pairs),
*self.extra_stages(scope_config_pairs, connection)
]

Expand All @@ -170,7 +170,7 @@ def extra_stages(self, scope_config_pairs: list[ScopeConfigPair],
return []

def make_pipeline_stage(self, scope: ToolScope, config: ScopeConfig,
connection: Connection) -> list[msg.PipelineTask]:
connection: Connection, skip_collectors: bool) -> list[msg.PipelineTask]:
"""
Generate a pipeline stage for the given scope, plus optional additional tasks.
Subtasks are selected from `entity_types` via `select_subtasks`.
Expand All @@ -180,7 +180,7 @@ def make_pipeline_stage(self, scope: ToolScope, config: ScopeConfig,
msg.PipelineTask(
plugin=self.name,
skip_on_fail=False,
subtasks=self.select_subtasks(scope, config),
subtasks=self.select_subtasks(scope, config, skip_collectors),
options={
"scopeId": scope.id,
"scopeName": scope.name,
Expand All @@ -196,14 +196,17 @@ def extra_tasks(self, scope: ToolScope, config: ScopeConfig,
"""Override this method to add tasks to the given scope stage"""
return []

def select_subtasks(self, scope: ToolScope, config: ScopeConfig) -> list[str]:
def select_subtasks(self, scope: ToolScope, config: ScopeConfig, skip_collectors: bool) -> list[str]:
"""
Returns the list of subtasks names that should be run for given scope and entity types.
"""
subtasks = []
for stream in self._streams.values():
if set(stream.domain_types).intersection(config.domain_types) and stream.should_run_on(scope):
for subtask in stream.subtasks:
target_subtasks = stream.subtasks
if skip_collectors:
target_subtasks = stream.subtasks_without_collector
for subtask in target_subtasks:
subtasks.append(subtask.name)
return subtasks

Expand Down Expand Up @@ -234,7 +237,8 @@ def plugin_info(self) -> msg.PluginInfo:
connection_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.connection_type),
scope_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.tool_scope_type),
scope_config_model_info=pydevlake.model_info.DynamicModelInfo.from_model(self.scope_config_type),
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model) for stream in self._streams.values()],
tool_model_infos=[pydevlake.model_info.DynamicModelInfo.from_model(stream.tool_model) for stream in
self._streams.values()],
subtask_metas=subtask_metas,
migration_scripts=MIGRATION_SCRIPTS
)
Expand Down
4 changes: 4 additions & 0 deletions backend/python/pydevlake/pydevlake/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def __init__(self, plugin_name: str):
def subtasks(self):
return [self.collector, self.extractor, self.convertor]

@property
def subtasks_without_collector(self):
return [self.extractor, self.convertor]

@property
def name(self):
return type(self).__name__.lower()
Expand Down
20 changes: 11 additions & 9 deletions backend/python/pydevlake/pydevlake/testing/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from typing import Union, Type, Iterable, Generator, Optional

import pytest
from pydantic import ValidationError
from sqlmodel import create_engine

from pydevlake.context import Context
from pydevlake.plugin import Plugin
from pydevlake.message import RemoteScopeGroup, PipelineTask
from pydevlake.model import DomainModel, Connection, DomainScope, ToolModel, ToolScope, ScopeConfig, DomainType
from pydevlake.plugin import Plugin
from pydevlake.stream import Stream


Expand Down Expand Up @@ -64,8 +63,8 @@ def make_context(connection, scope, scope_config):


def assert_stream_convert(plugin: Union[Plugin, Type[Plugin]], stream_name: str,
raw: dict, expected: Union[DomainModel, Iterable[DomainModel]],
ctx=None):
raw: dict, expected: Union[DomainModel, Iterable[DomainModel]],
ctx=None):
if isinstance(plugin, type):
plugin = plugin()
stream = plugin.get_stream(stream_name)
Expand Down Expand Up @@ -178,10 +177,12 @@ def assert_valid_remote_scopes(plugin: Plugin, connection: Connection, group_id:
return tool_scopes


def assert_valid_pipeline_plan(plugin: Plugin, connection: Connection, tool_scope: ToolScope, scope_config: ScopeConfig) -> list[list[PipelineTask]]:
def assert_valid_pipeline_plan(plugin: Plugin, connection: Connection, tool_scope: ToolScope, scope_config: ScopeConfig,
skip_collectors: bool) -> list[list[PipelineTask]]:
plan = plugin.make_pipeline_plan(
[(tool_scope, scope_config)],
connection
connection,
skip_collectors,
)
assert len(plan) > 0, 'Pipeline plan has no stage'
for stage in plan:
Expand All @@ -198,14 +199,15 @@ def assert_valid_plugin(plugin: Plugin):
assert_valid_streams(plugin)


def assert_plugin_run(plugin: Plugin, connection: Connection, scope_config: Optional[ScopeConfig] = None):
def assert_plugin_run(plugin: Plugin, connection: Connection, scope_config: Optional[ScopeConfig] = None,
skip_collectors: bool = False):
assert_valid_plugin(plugin)
assert_valid_connection(plugin, connection)
groups = assert_valid_remote_scope_groups(plugin, connection)
scope = assert_valid_remote_scopes(plugin, connection, groups[0].id)[0]
assert_valid_domain_scopes(plugin, scope)
scope_config = scope_config or plugin.scope_config_type()
assert_valid_pipeline_plan(plugin, connection, scope, scope_config)
assert_valid_pipeline_plan(plugin, connection, scope, scope_config, skip_collectors)
for stream in plugin.streams:
if isinstance(stream, type):
stream = stream(plugin.name)
Expand Down
2 changes: 1 addition & 1 deletion backend/server/services/remote/plugin/plugin_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p remoteDatasourcePlugin) MakeDataSourcePipelinePlanV200(
}

planData := models.PipelineData{}
err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopeConfigPairs, connection.Unwrap()).Get(&planData)
err = p.invoker.Call("make-pipeline", bridge.DefaultContext, toolScopeConfigPairs, connection.Unwrap(), skipCollectors).Get(&planData)
if err != nil {
return nil, nil, err
}
Expand Down
Loading