From e7f2b4596b6c38554b5f53cd525e403cf46e9cc5 Mon Sep 17 00:00:00 2001 From: henneberger Date: Tue, 9 Feb 2021 12:25:27 -0800 Subject: [PATCH 1/2] Denormalize namespace name to job table Signed-off-by: henneberger --- api/src/main/java/marquez/db/JobDao.java | 11 +++++++---- api/src/main/java/marquez/db/JobVersionDao.java | 8 +++----- api/src/main/java/marquez/db/OpenLineageDao.java | 1 + api/src/main/java/marquez/db/RunDao.java | 3 +-- .../V20__alter_jobs_to_add_namespace_name.sql | 2 ++ 5 files changed, 14 insertions(+), 11 deletions(-) create mode 100644 api/src/main/resources/marquez/db/migration/V20__alter_jobs_to_add_namespace_name.sql diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index 1c83dad732..3c549b6e7f 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -35,6 +35,7 @@ public interface JobDao { + "created_at, " + "updated_at, " + "namespace_uuid, " + + "namespace_name, " + "name, " + "description, " + "current_version_uuid" @@ -44,6 +45,7 @@ public interface JobDao { + ":createdAt, " + ":updatedAt, " + ":namespaceUuid, " + + ":namespaceName, " + ":name, " + ":description, " + ":currentVersionUuid)") @@ -80,9 +82,8 @@ public interface JobDao { Optional find(String namespaceName, String jobName); @SqlQuery( - "SELECT j.*, n.name AS namespace_name FROM jobs AS j " - + "INNER JOIN namespaces AS n " - + " ON (n.name = :namespaceName AND j.namespace_uuid = n.uuid) " + "SELECT j.* FROM jobs AS j " + + "WHERE namespace_name = :namespaceName " + "ORDER BY j.name " + "LIMIT :limit OFFSET :offset") List findAll(String namespaceName, int limit, int offset); @@ -97,6 +98,7 @@ public interface JobDao { + "created_at, " + "updated_at, " + "namespace_uuid, " + + "namespace_name, " + "name, " + "description " + ") VALUES ( " @@ -105,6 +107,7 @@ public interface JobDao { + ":now, " + ":now, " + ":namespaceUuid, " + + ":namespaceName, " + ":name, " + ":description " + ") ON CONFLICT (name, namespace_uuid) DO " @@ -114,5 +117,5 @@ public interface JobDao { + "description = EXCLUDED.description " + "RETURNING *") JobRow upsert( - UUID uuid, JobType type, Instant now, UUID namespaceUuid, String name, String description); + UUID uuid, JobType type, Instant now, UUID namespaceUuid, String namespaceName, String name, String description); } diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index cc257238da..28efae0a23 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -98,7 +98,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { void updateLatestRun(UUID rowUuid, Instant updatedAt, UUID latestRunUuid); final String EXTENDED_SELECT = - "SELECT j.namespace_uuid, jv.*, jc.uuid AS job_context_uuid, jc.context, n.name as namespace_name, j.name, " + "SELECT j.namespace_uuid, jv.*, jc.uuid AS job_context_uuid, jc.context, j.namespace_name, j.name, " + "ARRAY(SELECT dataset_uuid " + " FROM job_versions_io_mapping " + " WHERE job_version_uuid = jv.uuid AND " @@ -110,8 +110,6 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { + "FROM job_versions AS jv " + "INNER JOIN jobs AS j " + " ON j.uuid = jv.job_uuid " - + "INNER JOIN namespaces AS n " - + " ON j.namespace_uuid = n.uuid " + "INNER JOIN job_contexts AS jc " + " ON job_context_uuid = jc.uuid "; @@ -120,7 +118,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { @SqlQuery( EXTENDED_SELECT - + "WHERE n.name = :namespaceName AND j.name = :jobName AND j.current_version_uuid = jv.uuid " + + "WHERE j.namespace_name = :namespaceName AND j.name = :jobName AND j.current_version_uuid = jv.uuid " + "ORDER BY created_at DESC " + "LIMIT 1") Optional findLatest(String namespaceName, String jobName); @@ -130,7 +128,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { @SqlQuery( EXTENDED_SELECT - + "WHERE n.name = :namespaceName AND j.name = :jobName " + + "WHERE j.namespace_name = :namespaceName AND j.name = :jobName " + "ORDER BY created_at DESC " + "LIMIT :limit OFFSET :offset") List findAll(String namespaceName, String jobName, int limit, int offset); diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 78f766e046..adb5d66dd2 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -106,6 +106,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event) { getJobType(event.getJob()), now, namespace.getUuid(), + namespace.getName(), event.getJob().getName(), description); bag.setJob(job); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index a36127ce52..16678f5171 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -151,8 +151,7 @@ default void updateJobVersionUuid(UUID rowUuid, Instant updatedAt, UUID jobVersi SELECT_RUN + "INNER JOIN job_versions AS jv ON r.job_version_uuid = jv.uuid " + "INNER JOIN jobs AS j ON jv.job_uuid = j.uuid " - + "INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid " - + "WHERE n.name = :namespace and j.name = :jobName " + + "WHERE j.namespace_name = :namespace and j.name = :jobName " + "ORDER BY r.created_at DESC " + "LIMIT :limit OFFSET :offset") List findAll(String namespace, String jobName, int limit, int offset); diff --git a/api/src/main/resources/marquez/db/migration/V20__alter_jobs_to_add_namespace_name.sql b/api/src/main/resources/marquez/db/migration/V20__alter_jobs_to_add_namespace_name.sql new file mode 100644 index 0000000000..ad3f0a5f2a --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V20__alter_jobs_to_add_namespace_name.sql @@ -0,0 +1,2 @@ +ALTER TABLE jobs ADD namespace_name VARCHAR(255); +UPDATE jobs SET namespace_name = namespaces.name FROM namespaces WHERE jobs.namespace_uuid = namespaces.uuid; \ No newline at end of file From ba9a8791f3be4d82948fac73c7cdb137d578e7e8 Mon Sep 17 00:00:00 2001 From: henneberger Date: Tue, 9 Feb 2021 12:54:09 -0800 Subject: [PATCH 2/2] Add spotless Signed-off-by: henneberger --- api/src/main/java/marquez/db/JobDao.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index 3c549b6e7f..15b4f4ae6f 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -117,5 +117,11 @@ public interface JobDao { + "description = EXCLUDED.description " + "RETURNING *") JobRow upsert( - UUID uuid, JobType type, Instant now, UUID namespaceUuid, String namespaceName, String name, String description); + UUID uuid, + JobType type, + Instant now, + UUID namespaceUuid, + String namespaceName, + String name, + String description); }