Skip to content

Commit

Permalink
add new lightweight current runs query
Browse files Browse the repository at this point in the history
Signed-off-by: Prachi Mishra <[email protected]>
  • Loading branch information
prachim-collab committed Oct 31, 2022
1 parent 12702f3 commit 52ebaa3
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 12 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private int determineStatusCode(Throwable e) {
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
return Response.ok(lineageService.lineage(nodeId, depth)).build();
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
}

@Timed
Expand Down
42 changes: 41 additions & 1 deletion api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,52 @@ WHERE ds.uuid IN (<dsUuids>)""")
LIMIT 1""")
Optional<UUID> getJobFromInputOrOutput(String datasetName, String namespaceName);

@SqlQuery(
"WITH latest_runs AS (\n"
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
+ " FROM runs_view r\n"
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
+ ")\n"
+ "SELECT r.*, ra.args, ctx.context, f.facets,\n"
+ " r.version AS job_version, ri.input_versions, ro.output_versions\n"
+ " from latest_runs AS r\n"
+ "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
+ "LEFT JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
+ " FROM lineage_events le\n"
+ " WHERE le.run_uuid=r.uuid\n"
+ " GROUP BY le.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT im.run_uuid,\n"
+ " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
+ " 'name', dv.dataset_name,\n"
+ " 'version', dv.version)) AS input_versions\n"
+ " FROM runs_input_mapping im\n"
+ " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n"
+ " WHERE im.run_uuid=r.uuid\n"
+ " GROUP BY im.run_uuid\n"
+ ") ri ON ri.run_uuid=r.uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
+ " 'name', dataset_name,\n"
+ " 'version', version)) AS output_versions\n"
+ " FROM dataset_versions\n"
+ " WHERE run_uuid=r.uuid\n"
+ " GROUP BY run_uuid\n"
+ ") ro ON ro.run_uuid=r.uuid")
List<Run> getCurrentRunsWithFacets(@BindList Collection<UUID> jobUuid);

@SqlQuery(
"SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version\n"
+ " FROM runs_view r\n"
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC")
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n")
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);
}
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public LineageService(LineageDao delegate, JobDao jobDao) {
this.jobDao = jobDao;
}

public Lineage lineage(NodeId nodeId, int depth) {
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
Optional<UUID> optionalUUID = getJobUuid(nodeId);
if (optionalUUID.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
Expand All @@ -56,7 +56,11 @@ public Lineage lineage(NodeId nodeId, int depth) {
Set<JobData> jobData = getLineage(Collections.singleton(job), depth);

List<Run> runs =
getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));
withRunFacets
? getCurrentRunsWithFacets(
jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()))
: getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));

// todo fix runtime
for (JobData j : jobData) {
if (j.getLatestRun().isEmpty()) {
Expand Down
3 changes: 2 additions & 1 deletion api/src/test/java/marquez/api/OpenLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -41,7 +42,7 @@ class OpenLineageResourceTest {
OpenLineageResourceTest.class.getResourceAsStream("/lineage/node.json"),
new TypeReference<>() {});
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
when(lineageService.lineage(any(NodeId.class), anyInt())).thenReturn(LINEAGE);
when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE);

ServiceFactory serviceFactory =
ApiTestUtils.mockServiceFactory(Map.of(LineageService.class, lineageService));
Expand Down
54 changes: 52 additions & 2 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ public void testGetCurrentRuns() {
Stream.of(writeJob.getJob().getUuid()), newRows.stream().map(JobLineage::getId))
.collect(Collectors.toSet());

List<Run> currentRuns = lineageDao.getCurrentRuns(jobids);
List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
Expand All @@ -790,7 +790,7 @@ public void testGetCurrentRunsWithFailedJob() {

Set<UUID> jobids = Collections.singleton(writeJob.getJob().getUuid());

List<Run> currentRuns = lineageDao.getCurrentRuns(jobids);
List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
Expand All @@ -799,6 +799,56 @@ public void testGetCurrentRunsWithFailedJob() {
.contains(writeJob.getRun().getUuid());
}

@Test
public void testGetCurrentRunsWithFacetsGetsLatestRun() {
for (int i = 0; i < 5; i++) {
LineageTestUtils.createLineageRow(
openLineageDao,
"writeJob",
"COMPLETE",
jobFacet,
Arrays.asList(),
Arrays.asList(dataset));
}

List<JobLineage> newRows =
writeDownstreamLineage(
openLineageDao,
new LinkedList<>(
Arrays.asList(
new DatasetConsumerJob("readJob", 3, Optional.of("outputData2")),
new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))),
jobFacet,
dataset);
UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
openLineageDao, "writeJob", "FAIL", jobFacet, Arrays.asList(), Arrays.asList(dataset));

Set<UUID> expectedRunIds =
Stream.concat(
Stream.of(writeJob.getRun().getUuid()), newRows.stream().map(JobLineage::getRunId))
.collect(Collectors.toSet());
Set<UUID> jobids =
Stream.concat(
Stream.of(writeJob.getJob().getUuid()), newRows.stream().map(JobLineage::getId))
.collect(Collectors.toSet());

List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
.hasSize(expectedRunIds.size())
.extracting(r -> r.getId().getValue())
.containsAll(expectedRunIds);

// assert that run_args, input/output versions, and run facets are fetched from the dao.
for (Run run : currentRuns) {
assertThat(run.getArgs()).hasSize(2);
assertThat(run.getOutputVersions()).hasSize(1);
assertThat(run.getFacets()).hasSize(1);
}
}

@Test
public void testGetCurrentRunsGetsLatestRun() {
for (int i = 0; i < 5; i++) {
Expand Down
22 changes: 17 additions & 5 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void testLineage() {
dataset);
String jobName = writeJob.getJob().getName();
Lineage lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 1 commonDataset
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -158,6 +159,9 @@ public void testLineage() {
runAssert
.extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
.hasSize(0);
runAssert
.extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
.hasSize(1);

// check the output edges for the commonDataset node
assertThat(lineage.getGraph())
Expand Down Expand Up @@ -229,7 +233,8 @@ public void testLineageWithDeletedDataset() {

String jobName = writeJob.getJob().getName();
Lineage lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 0 commonDataset is hidden
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -263,6 +268,9 @@ public void testLineageWithDeletedDataset() {
runAssert
.extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
.hasSize(0);
runAssert
.extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
.hasSize(1);

// check the output edges for the commonDataset node
assertThat(lineage.getGraph())
Expand All @@ -275,7 +283,8 @@ public void testLineageWithDeletedDataset() {
jobDao.delete(NAMESPACE, "downstreamJob0<-outputData<-readJob0<-commonDataset");

lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 0 commonDataset is hidden
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -305,7 +314,9 @@ public void testLineageWithNoDatasets() {
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());
Lineage lineage =
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())), 5);
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())),
5,
true);
assertThat(lineage.getGraph())
.hasSize(1)
.first()
Expand Down Expand Up @@ -356,7 +367,8 @@ public void testLineageWithWithCycle() {
lineageService.lineage(
NodeId.of(
new NamespaceName(NAMESPACE), new JobName(intermediateJob.getJob().getName())),
5);
5,
true);
assertThat(lineage.getGraph()).extracting(Node::getId).hasSize(6);
ObjectAssert<Node> datasetNode =
assertThat(lineage.getGraph())
Expand Down

0 comments on commit 52ebaa3

Please sign in to comment.