diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index d48e9f394e..c00ae23764 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -92,42 +92,11 @@ WHERE ds.uuid IN ()""") Optional getJobFromInputOrOutput(String datasetName, String namespaceName); @SqlQuery( - "WITH latest_runs AS (\n" - + " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n" + "SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version\n" + " FROM runs_view r\n" + " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n" + " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n" + " WHERE j.uuid in () OR j.symlink_target_uuid IN ()\n" - + " ORDER BY r.job_name, r.namespace_name, created_at DESC\n" - + ")\n" - + "SELECT r.*, ra.args, ctx.context, f.facets,\n" - + " r.version AS job_version, ri.input_versions, ro.output_versions\n" - + " from latest_runs AS r\n" - + "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" - + "LEFT JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n" - + "LEFT JOIN LATERAL (\n" - + " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n" - + " FROM lineage_events le\n" - + " WHERE le.run_uuid=r.uuid\n" - + " GROUP BY le.run_uuid\n" - + ") AS f ON r.uuid=f.run_uuid\n" - + "LEFT JOIN LATERAL (\n" - + " SELECT im.run_uuid,\n" - + " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n" - + " 'name', dv.dataset_name,\n" - + " 'version', dv.version)) AS input_versions\n" - + " FROM runs_input_mapping im\n" - + " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n" - + " WHERE im.run_uuid=r.uuid\n" - + " GROUP BY im.run_uuid\n" - + ") ri ON ri.run_uuid=r.uuid\n" - + "LEFT JOIN LATERAL (\n" - + " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n" - + " 'name', dataset_name,\n" - + " 'version', version)) AS output_versions\n" - + " FROM dataset_versions\n" - + " WHERE run_uuid=r.uuid\n" - + " GROUP BY run_uuid\n" - + ") ro ON ro.run_uuid=r.uuid") + + " ORDER BY r.job_name, r.namespace_name, created_at DESC") List getCurrentRuns(@BindList Collection jobUuid); } diff --git a/api/src/main/java/marquez/db/mappers/RunMapper.java b/api/src/main/java/marquez/db/mappers/RunMapper.java index fbccbde139..3a1f95eb64 100644 --- a/api/src/main/java/marquez/db/mappers/RunMapper.java +++ b/api/src/main/java/marquez/db/mappers/RunMapper.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; @@ -71,7 +72,7 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) ? timestampOrNull(results, columnPrefix + Columns.ENDED_AT) : null, durationMs.orElse(null), - toArgs(results, columnPrefix + Columns.ARGS), + toArgsOrNull(results, columnPrefix + Columns.ARGS), stringOrThrow(results, columnPrefix + Columns.NAMESPACE_NAME), stringOrThrow(results, columnPrefix + Columns.JOB_NAME), uuidOrNull(results, columnPrefix + Columns.JOB_VERSION), @@ -82,7 +83,9 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) ? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) : ImmutableList.of(), - JobMapper.toContext(results, columnPrefix + Columns.CONTEXT), + columnNames.contains(columnPrefix + Columns.CONTEXT) + ? JobMapper.toContext(results, columnPrefix + Columns.CONTEXT) + : null, toFacetsOrNull(results, columnPrefix + Columns.FACETS)); } @@ -94,8 +97,12 @@ private List toDatasetVersion(ResultSet rs, String column) thr return Utils.fromJson(dsString, new TypeReference>() {}); } - private Map toArgs(ResultSet results, String column) throws SQLException { - String args = stringOrNull(results, column); + private Map toArgsOrNull(ResultSet results, String argsColumn) + throws SQLException { + if (!Columns.exists(results, argsColumn)) { + return ImmutableMap.of(); + } + String args = stringOrNull(results, argsColumn); if (args == null) { return null; } diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 47641d4faa..8248adbeeb 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -158,9 +158,6 @@ public void testLineage() { runAssert .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) .hasSize(0); - runAssert - .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) - .hasSize(1); // check the output edges for the commonDataset node assertThat(lineage.getGraph()) @@ -266,9 +263,6 @@ public void testLineageWithDeletedDataset() { runAssert .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) .hasSize(0); - runAssert - .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) - .hasSize(1); // check the output edges for the commonDataset node assertThat(lineage.getGraph())