Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2020 #2151 - fixes dbt 1.8.6 and arrow dict types #2175

Merged
merged 4 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
return dict(data_type="decimal", precision=dtype.precision, scale=dtype.scale)
elif pyarrow.types.is_nested(dtype):
return dict(data_type="json")
elif pyarrow.types.is_dictionary(dtype):
# Dictionary types are essentially categorical encodings. The underlying value_type
# dictates the "logical" type. We simply delegate to the underlying value_type.
return get_column_type_from_py_arrow(dtype.value_type)
else:
raise ValueError(dtype)

Expand Down
16 changes: 14 additions & 2 deletions dlt/common/runners/stdout.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ def iter_std(
Use -u in scripts_args for unbuffered python execution
"""
with venv.start_command(
command, *script_args, stdout=PIPE, stderr=PIPE, bufsize=1, text=True
command,
*script_args,
stdout=PIPE,
stderr=PIPE,
bufsize=1,
text=True,
errors="backslashreplace",
) as process:
exit_code: int = None
q_: queue.Queue[Tuple[OutputStdStreamNo, str]] = queue.Queue()
Expand Down Expand Up @@ -72,7 +78,13 @@ def _r_q(std_: OutputStdStreamNo) -> None:
def iter_stdout(venv: Venv, command: str, *script_args: Any) -> Iterator[str]:
# start a process in virtual environment, assume that text comes from stdout
with venv.start_command(
command, *script_args, stdout=PIPE, stderr=PIPE, bufsize=1, text=True
command,
*script_args,
stdout=PIPE,
stderr=PIPE,
bufsize=1,
text=True,
errors="backslashreplace",
) as process:
exit_code: int = None
line = ""
Expand Down
16 changes: 12 additions & 4 deletions dlt/common/runners/venv.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,18 @@ def run_command(self, entry_point: str, *script_args: Any) -> str:
# runs one of installed entry points typically CLIs coming with packages and installed into PATH
command = os.path.join(self.context.bin_path, entry_point)
cmd = [command, *script_args]
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
return subprocess.check_output(
cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace"
)

def run_script(self, script_path: str, *script_args: Any) -> str:
"""Runs a python `script` source with specified `script_args`. Current `os.environ` and cwd is passed to executed process"""
# os.environ is passed to executed process
cmd = [self.context.env_exe, os.path.abspath(script_path), *script_args]
try:
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
return subprocess.check_output(
cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace"
)
except subprocess.CalledProcessError as cpe:
if cpe.returncode == 2:
raise FileNotFoundError(script_path)
Expand All @@ -115,7 +119,9 @@ def run_script(self, script_path: str, *script_args: Any) -> str:
def run_module(self, module: str, *module_args: Any) -> str:
"""Runs a python `module` with specified `module_args`. Current `os.environ` and cwd is passed to executed process"""
cmd = [self.context.env_exe, "-m", module, *module_args]
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True)
return subprocess.check_output(
cmd, stderr=subprocess.STDOUT, text=True, errors="backslashreplace"
)

def add_dependencies(self, dependencies: List[str] = None) -> None:
Venv._install_deps(self.context, dependencies)
Expand All @@ -134,7 +140,9 @@ def _install_deps(context: types.SimpleNamespace, dependencies: List[str]) -> No
cmd = [context.env_exe, "-Im", Venv.PIP_TOOL, "install"]

try:
subprocess.check_output(cmd + dependencies, stderr=subprocess.STDOUT)
subprocess.check_output(
cmd + dependencies, stderr=subprocess.STDOUT, errors="backslashreplace"
)
except subprocess.CalledProcessError as exc:
raise CannotInstallDependencies(dependencies, context.env_exe, exc.output)

Expand Down
32 changes: 10 additions & 22 deletions dlt/helpers/dbt/dbt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,16 @@

# can only import DBT after redirect is disabled
# https://stackoverflow.com/questions/48619517/call-a-click-command-from-code
except ImportError:
pass

try:
import dbt.logger
from dbt.contracts import results as dbt_results
except ModuleNotFoundError:
raise MissingDependencyException("DBT Core", ["dbt-core"])

try:
# dbt <1.5
from dbt.main import handle_and_check # type: ignore[import-not-found]
except ImportError:
# dbt >=1.5
from dbt.cli.main import dbtRunner

try:
from dbt.exceptions import FailFastException # type: ignore
from dbt.exceptions import FailFastError
except ImportError:
from dbt.exceptions import FailFastError as FailFastException
raise MissingDependencyException("DBT Core", ["dbt-core"])

_DBT_LOGGER_INITIALIZED = False

Expand Down Expand Up @@ -135,15 +128,10 @@ def run_dbt_command(
runner_args = (global_args or []) + [command] + args # type: ignore

with dbt.logger.log_manager.applicationbound():
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like with this change we're not supporting dbt<1.5 anymore. We could write a note in the docs that 1.5.2 (looking at test_runner_dbt_versions.py) is the minimum tested/supported version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right! the fact is we do not support <1.5 for a long time already...

# dbt 1.5
runner = dbtRunner()
run_result = runner.invoke(runner_args)
success = run_result.success
results = run_result.result # type: ignore
except NameError:
# dbt < 1.5
results, success = handle_and_check(runner_args)
runner = dbtRunner()
run_result = runner.invoke(runner_args)
success = run_result.success
results = run_result.result # type: ignore

assert type(success) is bool
parsed_results = parse_dbt_execution_results(results)
Expand All @@ -157,7 +145,7 @@ def run_dbt_command(
except SystemExit as sys_ex:
# oftentimes dbt tries to exit on error
raise DBTProcessingError(command, None, sys_ex)
except FailFastException as ff:
except FailFastError as ff:
dbt_exc = DBTProcessingError(command, parse_dbt_execution_results(ff.result), ff.result)
# detect incremental model out of sync
if is_incremental_schema_out_of_sync_error(ff.result):
Expand Down
4 changes: 2 additions & 2 deletions docs/website/docs/general-usage/credentials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ keywords: [credentials, secrets.toml, secrets, config, configuration, environmen
---
import DocCardList from '@theme/DocCardList';

`dlt` pipelines usually require configurations and credentials. These can be set up in [various ways](setup):
`dlt` pipelines usually require configurations and credentials. These can be set up in [various ways](./setup):

1. Environment variables
2. Configuration files (`secrets.toml` and `config.toml`)
3. Key managers and vaults

`dlt` automatically extracts configuration settings and secrets based on flexible [naming conventions](setup/#naming-convention). It then [injects](advanced/#injection-mechanism) these values where needed in code.
`dlt` automatically extracts configuration settings and secrets based on flexible [naming conventions](./setup/#naming-convention). It then [injects](./advanced/#injection-mechanism) these values where needed in code.

# Learn details about

Expand Down
4 changes: 2 additions & 2 deletions tests/helpers/dbt_tests/test_runner_dbt_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def client() -> Iterator[PostgresClient]:
PACKAGE_PARAMS = [
("postgres", "1.5.2"),
("postgres", "1.6.13"),
("postgres", "1.8.1"),
("postgres", "1.8.6"),
("postgres", None),
("snowflake", "1.5.2"),
("snowflake", "1.6.13"),
("snowflake", "1.8.1"),
("snowflake", "1.8.6"),
("snowflake", None),
]
PACKAGE_IDS = [
Expand Down
12 changes: 12 additions & 0 deletions tests/libs/pyarrow/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ def test_py_arrow_to_table_schema_columns():
assert result == dlt_schema


def test_py_arrow_dict_to_column() -> None:
array_1 = pa.array(["a", "b", "c"], type=pa.dictionary(pa.int8(), pa.string()))
array_2 = pa.array([1, 2, 3], type=pa.dictionary(pa.int8(), pa.int64()))
table = pa.table({"strings": array_1, "ints": array_2})
columns = py_arrow_to_table_schema_columns(table.schema)
assert columns == {
"strings": {"name": "strings", "nullable": True, "data_type": "text"},
"ints": {"name": "ints", "nullable": True, "data_type": "bigint"},
}
assert table.to_pydict() == {"strings": ["a", "b", "c"], "ints": [1, 2, 3]}


def test_to_arrow_scalar() -> None:
naive_dt = get_py_arrow_timestamp(6, tz=None)
# print(naive_dt)
Expand Down
Loading