Skip to content

Commit

Permalink
chore(dbt cloud): return job instead of job ID
Browse files Browse the repository at this point in the history
  • Loading branch information
betodealmeida committed Dec 21, 2023
1 parent f87cb16 commit b09de64
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/preset_cli/api/clients/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
>>> from yarl import URL
>>> from preset_cli.api.clients.superset import SupersetClient
>>> from preset_cli.auth.password import UsernamePasswordAuth
>>> from preset_cli.auth.superset import UsernamePasswordAuth
>>> url = URL("http://localhost:8088/")
>>> auth = UsernamePasswordAuth(url, "admin", "admin") # doctest: +SKIP
>>> client = SupersetClient(url, auth) # doctest: +SKIP
Expand Down
54 changes: 33 additions & 21 deletions src/preset_cli/cli/superset/sync/dbt/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import yaml
from yarl import URL

from preset_cli.api.clients.dbt import DBTClient, MetricSchema, ModelSchema
from preset_cli.api.clients.dbt import DBTClient, JobSchema, MetricSchema, ModelSchema
from preset_cli.api.clients.superset import SupersetClient
from preset_cli.auth.token import TokenAuth
from preset_cli.cli.superset.sync.dbt.databases import sync_database
Expand Down Expand Up @@ -282,11 +282,12 @@ def get_project_id(client: DBTClient, account_id: Optional[int] = None) -> int:
click.echo("Invalid choice")


def get_job_id(
def get_job(
client: DBTClient,
account_id: Optional[int] = None,
project_id: Optional[int] = None,
) -> int:
job_id: Optional[int] = None,
) -> JobSchema:
"""
Prompt users for a job ID.
"""
Expand All @@ -299,21 +300,29 @@ def get_job_id(
if not jobs:
click.echo(click.style("No jobs available", fg="bright_red"))
sys.exit(1)
if len(jobs) == 1:
return jobs[0]["id"]

click.echo("Choose a job:")
for i, job in enumerate(jobs):
click.echo(f'({i+1}) {job["name"]}')
if job_id is None:
if len(jobs) == 1:
return jobs[0]

while True:
try:
choice = int(input("> "))
except Exception: # pylint: disable=broad-except
choice = -1
if 0 < choice <= len(jobs):
return jobs[choice - 1]["id"]
click.echo("Invalid choice")
click.echo("Choose a job:")
for i, job in enumerate(jobs):
click.echo(f'({i+1}) {job["name"]}')

while True:
try:
choice = int(input("> "))
except Exception: # pylint: disable=broad-except
choice = -1
if 0 < choice <= len(jobs):
return jobs[choice - 1]
click.echo("Invalid choice")

for job in jobs:
if job["id"] == job_id:
return job

raise ValueError(f"Job {job_id} not available")


@click.command()
Expand Down Expand Up @@ -407,11 +416,14 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
reload_columns = not (preserve_columns or preserve_metadata or merge_metadata)
preserve_metadata = preserve_columns if preserve_columns else preserve_metadata

if job_id is None:
job_id = get_job_id(dbt_client)
try:
job = get_job(dbt_client, job_id=job_id)
except ValueError:
click.echo(click.style(f"Job {job_id} not available", fg="bright_red"))
sys.exit(2)

# with dbt cloud the database must already exist
database_name = dbt_client.get_database_name(job_id)
database_name = dbt_client.get_database_name(job["id"])
databases = superset_client.get_databases(database_name=database_name)
if not databases:
click.echo(f'No database named "{database_name}" was found')
Expand All @@ -422,13 +434,13 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
# need to get the database by itself so the response has the SQLAlchemy URI
database = superset_client.get_database(databases[0]["id"])

models = dbt_client.get_models(job_id)
models = dbt_client.get_models(job["id"])
models = apply_select(models, select, exclude)
model_map = {
ModelKey(model["schema"], model["name"]): f"ref('{model['name']}')"
for model in models
}
metrics = dbt_client.get_metrics(job_id)
metrics = dbt_client.get_metrics(job["id"])

if exposures_only:
datasets = [
Expand Down
89 changes: 82 additions & 7 deletions tests/cli/superset/sync/dbt/command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from preset_cli.cli.superset.main import superset_cli
from preset_cli.cli.superset.sync.dbt.command import (
get_account_id,
get_job_id,
get_job,
get_project_id,
)
from preset_cli.exceptions import DatabaseNotFoundError
Expand Down Expand Up @@ -960,6 +960,10 @@ def test_dbt_cloud(mocker: MockerFixture) -> None:
sync_datasets = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.sync_datasets",
)
mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_job",
return_value={"id": 123, "name": "My job"},
)

dbt_client.get_models.return_value = dbt_cloud_models
dbt_client.get_metrics.return_value = dbt_cloud_metrics
Expand Down Expand Up @@ -1008,6 +1012,10 @@ def test_dbt_cloud_preserve_metadata(mocker: MockerFixture) -> None:
sync_datasets = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.sync_datasets",
)
mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_job",
return_value={"id": 123, "name": "My job"},
)

dbt_client.get_models.return_value = dbt_cloud_models
dbt_client.get_metrics.return_value = dbt_cloud_metrics
Expand Down Expand Up @@ -1057,6 +1065,10 @@ def test_dbt_cloud_preserve_columns(mocker: MockerFixture) -> None:
sync_datasets = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.sync_datasets",
)
mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_job",
return_value={"id": 123, "name": "My job"},
)

dbt_client.get_models.return_value = dbt_cloud_models
dbt_client.get_metrics.return_value = dbt_cloud_metrics
Expand Down Expand Up @@ -1106,6 +1118,10 @@ def test_dbt_cloud_merge_metadata(mocker: MockerFixture) -> None:
sync_datasets = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.sync_datasets",
)
mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_job",
return_value={"id": 123, "name": "My job"},
)

dbt_client.get_models.return_value = dbt_cloud_models
dbt_client.get_metrics.return_value = dbt_cloud_metrics
Expand Down Expand Up @@ -1282,32 +1298,43 @@ def test_get_project_id(mocker: MockerFixture) -> None:
client.get_projects.assert_called_with(42)


def test_get_job_id(mocker: MockerFixture) -> None:
def test_get_job(mocker: MockerFixture) -> None:
"""
Test the ``get_job_id`` helper.
Test the ``get_job`` helper.
"""
client = mocker.MagicMock()

client.get_jobs.return_value = []
with pytest.raises(SystemExit) as excinfo:
get_job_id(client, account_id=42, project_id=43)
get_job(client, account_id=42, project_id=43)
assert excinfo.type == SystemExit
assert excinfo.value.code == 1

client.get_jobs.return_value = [
{"id": 1, "name": "My job"},
]
assert get_job_id(client, account_id=42, project_id=43) == 1
assert get_job(client, account_id=42, project_id=43) == {"id": 1, "name": "My job"}

client.get_jobs.return_value = [
{"id": 1, "name": "My job"},
{"id": 3, "name": "My other job"},
]
assert get_job(client, account_id=42, project_id=43, job_id=3) == {
"id": 3,
"name": "My other job",
}
with pytest.raises(ValueError) as excinfo:
get_job(client, account_id=42, project_id=43, job_id=2)
assert str(excinfo.value) == "Job 2 not available"

mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.input",
side_effect=["invalid", "2"],
)
assert get_job_id(client, account_id=42, project_id=43) == 3
assert get_job(client, account_id=42, project_id=43) == {
"id": 3,
"name": "My other job",
}

mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_account_id",
Expand All @@ -1316,7 +1343,7 @@ def test_get_job_id(mocker: MockerFixture) -> None:
client.get_jobs.return_value = [
{"id": 1, "name": "My job"},
]
assert get_job_id(client, project_id=43) == 1
assert get_job(client, project_id=43) == {"id": 1, "name": "My job"}
client.get_jobs.assert_called_with(42, 43)


Expand All @@ -1335,6 +1362,10 @@ def test_dbt_cloud_no_database(mocker: MockerFixture) -> None:
dbt_client = DBTClient()
dbt_client.get_database_name.return_value = "my_db"
superset_client.get_databases.return_value = []
mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_job",
return_value={"id": 123, "name": "My job"},
)

runner = CliRunner()
result = runner.invoke(
Expand All @@ -1352,6 +1383,42 @@ def test_dbt_cloud_no_database(mocker: MockerFixture) -> None:
assert result.output == 'No database named "my_db" was found\n'


def test_dbt_cloud_invalid_job_id(mocker: MockerFixture) -> None:
"""
Test the ``dbt-cloud`` command when an invalid job ID is passed.
"""
SupersetClient = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.SupersetClient",
)
superset_client = SupersetClient()
mocker.patch("preset_cli.cli.superset.main.UsernamePasswordAuth")
DBTClient = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.DBTClient",
)
dbt_client = DBTClient()
dbt_client.get_database_name.return_value = "my_db"
superset_client.get_databases.return_value = []
mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_job",
side_effect=ValueError("Job 123 not available"),
)

runner = CliRunner()
result = runner.invoke(
superset_cli,
[
"https://superset.example.org/",
"sync",
"dbt-cloud",
"XXX",
"123",
],
catch_exceptions=False,
)
assert result.exit_code == 2
assert result.output == "Job 123 not available\n"


def test_dbt_cloud_multiple_databases(mocker: MockerFixture) -> None:
"""
Test the ``dbt-cloud`` command when multiple databases are found.
Expand All @@ -1373,6 +1440,10 @@ def test_dbt_cloud_multiple_databases(mocker: MockerFixture) -> None:
mocker.MagicMock(),
mocker.MagicMock(),
]
mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_job",
return_value={"id": 123, "name": "My job"},
)

runner = CliRunner()
with pytest.raises(Exception) as excinfo:
Expand Down Expand Up @@ -1479,6 +1550,10 @@ def test_dbt_cloud_exposures_only(mocker: MockerFixture, fs: FakeFilesystem) ->
sync_exposures = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.sync_exposures",
)
mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.get_job",
return_value={"id": 123, "name": "My job"},
)

dbt_client.get_models.return_value = dbt_cloud_models
dbt_client.get_metrics.return_value = dbt_cloud_metrics
Expand Down

0 comments on commit b09de64

Please sign in to comment.