Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Denormalize namespace name to job table #934

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface JobDao {
+ "created_at, "
+ "updated_at, "
+ "namespace_uuid, "
+ "namespace_name, "
+ "name, "
+ "description, "
+ "current_version_uuid"
Expand All @@ -44,6 +45,7 @@ public interface JobDao {
+ ":createdAt, "
+ ":updatedAt, "
+ ":namespaceUuid, "
+ ":namespaceName, "
+ ":name, "
+ ":description, "
+ ":currentVersionUuid)")
Expand Down Expand Up @@ -80,9 +82,8 @@ public interface JobDao {
Optional<JobRow> find(String namespaceName, String jobName);

@SqlQuery(
"SELECT j.*, n.name AS namespace_name FROM jobs AS j "
+ "INNER JOIN namespaces AS n "
+ " ON (n.name = :namespaceName AND j.namespace_uuid = n.uuid) "
"SELECT j.* FROM jobs AS j "
+ "WHERE namespace_name = :namespaceName "
+ "ORDER BY j.name "
+ "LIMIT :limit OFFSET :offset")
List<JobRow> findAll(String namespaceName, int limit, int offset);
Expand All @@ -97,6 +98,7 @@ public interface JobDao {
+ "created_at, "
+ "updated_at, "
+ "namespace_uuid, "
+ "namespace_name, "
+ "name, "
+ "description "
+ ") VALUES ( "
Expand All @@ -105,6 +107,7 @@ public interface JobDao {
+ ":now, "
+ ":now, "
+ ":namespaceUuid, "
+ ":namespaceName, "
+ ":name, "
+ ":description "
+ ") ON CONFLICT (name, namespace_uuid) DO "
Expand All @@ -114,5 +117,11 @@ public interface JobDao {
+ "description = EXCLUDED.description "
+ "RETURNING *")
JobRow upsert(
UUID uuid, JobType type, Instant now, UUID namespaceUuid, String name, String description);
UUID uuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description);
}
8 changes: 3 additions & 5 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {
void updateLatestRun(UUID rowUuid, Instant updatedAt, UUID latestRunUuid);

final String EXTENDED_SELECT =
"SELECT j.namespace_uuid, jv.*, jc.uuid AS job_context_uuid, jc.context, n.name as namespace_name, j.name, "
"SELECT j.namespace_uuid, jv.*, jc.uuid AS job_context_uuid, jc.context, j.namespace_name, j.name, "
+ "ARRAY(SELECT dataset_uuid "
+ " FROM job_versions_io_mapping "
+ " WHERE job_version_uuid = jv.uuid AND "
Expand All @@ -110,8 +110,6 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {
+ "FROM job_versions AS jv "
+ "INNER JOIN jobs AS j "
+ " ON j.uuid = jv.job_uuid "
+ "INNER JOIN namespaces AS n "
+ " ON j.namespace_uuid = n.uuid "
+ "INNER JOIN job_contexts AS jc "
+ " ON job_context_uuid = jc.uuid ";

Expand All @@ -120,7 +118,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {

@SqlQuery(
EXTENDED_SELECT
+ "WHERE n.name = :namespaceName AND j.name = :jobName AND j.current_version_uuid = jv.uuid "
+ "WHERE j.namespace_name = :namespaceName AND j.name = :jobName AND j.current_version_uuid = jv.uuid "
+ "ORDER BY created_at DESC "
+ "LIMIT 1")
Optional<ExtendedJobVersionRow> findLatest(String namespaceName, String jobName);
Expand All @@ -130,7 +128,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {

@SqlQuery(
EXTENDED_SELECT
+ "WHERE n.name = :namespaceName AND j.name = :jobName "
+ "WHERE j.namespace_name = :namespaceName AND j.name = :jobName "
+ "ORDER BY created_at DESC "
+ "LIMIT :limit OFFSET :offset")
List<ExtendedJobVersionRow> findAll(String namespaceName, String jobName, int limit, int offset);
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event) {
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
event.getJob().getName(),
description);
bag.setJob(job);
Expand Down
3 changes: 1 addition & 2 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ default void updateJobVersionUuid(UUID rowUuid, Instant updatedAt, UUID jobVersi
SELECT_RUN
+ "INNER JOIN job_versions AS jv ON r.job_version_uuid = jv.uuid "
+ "INNER JOIN jobs AS j ON jv.job_uuid = j.uuid "
+ "INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid "
+ "WHERE n.name = :namespace and j.name = :jobName "
+ "WHERE j.namespace_name = :namespace and j.name = :jobName "
+ "ORDER BY r.created_at DESC "
+ "LIMIT :limit OFFSET :offset")
List<ExtendedRunRow> findAll(String namespace, String jobName, int limit, int offset);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE jobs ADD namespace_name VARCHAR(255);
Copy link
Member

@wslulciuc wslulciuc Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With PR #935 now merged, mind updating the migration version to V22 to maintain ordering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pr #935 was branched off of this commit so this pr can be closed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

UPDATE jobs SET namespace_name = namespaces.name FROM namespaces WHERE jobs.namespace_uuid = namespaces.uuid;