Skip to content

Commit

Permalink
Merge branch 'master' into feat/elasticsearch-optimization-ext
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Jan 9, 2023
2 parents 867390e + 432feaa commit 0548e77
Show file tree
Hide file tree
Showing 15 changed files with 58 additions and 25 deletions.
1 change: 1 addition & 0 deletions docker/kafka-setup/kafka-topic-workers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ for ((i=1;i<=$WORKERS;i++)); do
echo will start $i
work $i &
done

1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Breaking Changes

- #6742 The metadata file sink's output format no longer contains nested JSON strings for MCP aspects, but instead unpacks the stringified JSON into a real JSON object. The previous sink behavior can be recovered using the `legacy_nested_json_string` option. The file source is backwards compatible and supports both formats.
- #6901 The `env` and `database_alias` fields have been marked deprecated across all sources. We recommend using `platform_instance` where possible instead.

### Potential Downtime

Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ class IgnorableError(MetaError):
"""An error that can be ignored."""


class ConfigurationWarning(Warning):
"""A configuration warning."""


class ConfigurationMechanism(ABC):
@abstractmethod
def load_config(self, config_fp: IO) -> dict:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import warnings
from typing import Optional, Type

import pydantic

from datahub.configuration.common import ConfigurationWarning


def pydantic_field_deprecated(field: str, message: Optional[str] = None) -> classmethod:
if message:
output = message
else:
output = f"{field} is deprecated and will be removed in a future release. Please remove it from your config."

def _validate_deprecated(cls: Type, values: dict) -> dict:
if field in values:
warnings.warn(output, ConfigurationWarning, stacklevel=2)
return values

return pydantic.root_validator(pre=True, allow_reuse=True)(_validate_deprecated)
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pydantic.fields import Field

from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.metadata.schema_classes import FabricTypeClass

DEFAULT_ENV = FabricTypeClass.PROD
Expand Down Expand Up @@ -39,6 +40,11 @@ class EnvBasedSourceConfigBase(ConfigModel):
description="The environment that all assets produced by this connector belong to",
)

_env_deprecation = pydantic_field_deprecated(
"env",
"env is deprecated and will be removed in a future release. Please use platform_instance instead.",
)

@validator("env")
def env_must_be_one_of(cls, v: str) -> str:
if v.upper() not in ALL_ENV_TYPES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import pydantic

from datahub.configuration.common import ConfigurationWarning


def pydantic_removed_field(
field: str,
Expand All @@ -13,7 +15,7 @@ def _validate_field_rename(cls: Type, values: dict) -> dict:
if print_warning:
warnings.warn(
f"The {field} was removed, please remove it from your recipe.",
UserWarning,
ConfigurationWarning,
stacklevel=2,
)
values.pop(field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import pydantic

from datahub.configuration.common import ConfigurationWarning

_T = TypeVar("_T")


Expand All @@ -25,8 +27,8 @@ def _validate_field_rename(cls: Type, values: dict) -> dict:
else:
if print_warning:
warnings.warn(
f"The {old_name} is deprecated, please use {new_name} instead.",
UserWarning,
f"{old_name} is deprecated, please use {new_name} instead.",
ConfigurationWarning,
stacklevel=2,
)
values[new_name] = transform(values.pop(old_name))
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.aws.s3_util import is_s3_uri, make_s3_urn
from datahub.ingestion.source.glue_profiling_config import GlueProfilingConfig
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.sql_common_state import (
Expand Down Expand Up @@ -709,7 +709,7 @@ def get_lineage_if_enabled(
] = mce_builder.get_aspect_if_available(mce, DatasetPropertiesClass)
if dataset_properties and "Location" in dataset_properties.customProperties:
location = dataset_properties.customProperties["Location"]
if location.startswith("s3://"):
if is_s3_uri(location):
s3_dataset_urn = make_s3_urn(location, self.source_config.env)
if self.source_config.glue_s3_lineage_direction == "upstream":
upstream_lineage = UpstreamLineageClass(
Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import DatasetSourceConfigBase
from datahub.emitter.mce_builder import (
DEFAULT_ENV,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
Expand Down Expand Up @@ -72,8 +71,6 @@ class KafkaTopicConfigKeys(str, Enum):


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
env: str = DEFAULT_ENV
# TODO: inline the connection config
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
Expand Down
5 changes: 0 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ def get_swagger(self) -> Dict:
return sw_dict


# class ParserWarning(UserWarning):
# def __init__(self, message: str, key: str) -> None:
# self.message


class ApiWorkUnit(MetadataWorkUnit):
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import warnings

from datahub.configuration.common import ConfigurationWarning
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.api.source import Source

Expand All @@ -14,14 +15,16 @@
"snowflake-beta",
"snowflake",
lambda: warnings.warn(
UserWarning("source type snowflake-beta is deprecated, use snowflake instead")
"source type snowflake-beta is deprecated, use snowflake instead",
ConfigurationWarning,
),
)
source_registry.register_alias(
"bigquery-beta",
"bigquery",
lambda: warnings.warn(
UserWarning("source type bigquery-beta is deprecated, use bigquery instead")
"source type bigquery-beta is deprecated, use bigquery instead",
ConfigurationWarning,
),
)

Expand Down
5 changes: 0 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ class SQLServerConfig(BasicSQLAlchemyConfig):
description="database (catalog). If set to Null, all databases will be considered for ingestion.",
)

database_alias: Optional[str] = Field(
default=None,
description="Alias to apply to database when ingesting. Ignored when `database` is not set.",
)

@pydantic.validator("uri_args")
def passwords_match(cls, v, values, **kwargs):
if values["use_odbc"] and "driver" not in v:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from sqlalchemy.types import TypeDecorator, TypeEngine

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
Expand Down Expand Up @@ -308,15 +309,20 @@ class BasicSQLAlchemyConfig(SQLAlchemyConfig):
description="URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters.",
)

_database_alias_deprecation = pydantic_field_deprecated(
"database_alias",
message="database_alias is deprecated. Use platform_instance instead.",
)

def get_sql_alchemy_url(self, uri_opts: Optional[Dict[str, Any]] = None) -> str:
if not ((self.host_port and self.scheme) or self.sqlalchemy_uri):
raise ValueError("host_port and schema or connect_uri required.")

return self.sqlalchemy_uri or make_sqlalchemy_uri(
self.scheme, # type: ignore
self.scheme,
self.username,
self.password.get_secret_value() if self.password is not None else None,
self.host_port, # type: ignore
self.host_port,
self.database,
uri_opts=uri_opts,
)
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/tests/unit/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def test_airflow_provider_info():
assert get_provider_info()


@pytest.mark.filterwarnings("ignore:.*is deprecated.*")
def test_dags_load_with_no_errors(pytestconfig: pytest.Config) -> None:
airflow_examples_folder = (
pytestconfig.rootpath / "src/datahub_provider/example_dags"
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/tests/unit/test_plugin_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from datahub.configuration.common import ConfigurationError
from datahub.configuration.common import ConfigurationError, ConfigurationWarning
from datahub.ingestion.api.registry import PluginRegistry
from datahub.ingestion.api.sink import Sink
from datahub.ingestion.extractor.extractor_registry import extractor_registry
Expand Down Expand Up @@ -97,9 +97,9 @@ class DummyClass:
"console-alias",
"console",
lambda: warnings.warn(
UserWarning("console-alias is deprecated, use console instead")
ConfigurationWarning("console-alias is deprecated, use console instead")
),
)
with pytest.warns(UserWarning):
with pytest.warns(ConfigurationWarning):
assert fake_registry.get("console-alias") == ConsoleSink
assert "console-alias" not in fake_registry.summary(verbose=False)

0 comments on commit 0548e77

Please sign in to comment.