Skip to content

Commit

Permalink
optimizing current runs query for lieage api (#2211)
Browse files Browse the repository at this point in the history
* optimizing current runs query for lieage api

Signed-off-by: Prachi Mishra <[email protected]>

* removing assert

Signed-off-by: Prachi Mishra <[email protected]>

* add new lightweight current runs query

Signed-off-by: Prachi Mishra <[email protected]>

* addressing review comments

Signed-off-by: Prachi Mishra <[email protected]>

Signed-off-by: Prachi Mishra <[email protected]>
  • Loading branch information
prachim-collab authored Nov 2, 2022
1 parent 4b86615 commit 6dad6aa
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 16 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private int determineStatusCode(Throwable e) {
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
return Response.ok(lineageService.lineage(nodeId, depth)).build();
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
}

@Timed
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,15 @@ WHERE ds.uuid IN (<dsUuids>)""")
+ " WHERE run_uuid=r.uuid\n"
+ " GROUP BY run_uuid\n"
+ ") ro ON ro.run_uuid=r.uuid")
List<Run> getCurrentRunsWithFacets(@BindList Collection<UUID> jobUuid);

@SqlQuery(
"""
SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version
FROM runs_view r
INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
ORDER BY r.job_name, r.namespace_name, created_at DESC""")
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);
}
15 changes: 11 additions & 4 deletions api/src/main/java/marquez/db/mappers/RunMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context)
? timestampOrNull(results, columnPrefix + Columns.ENDED_AT)
: null,
durationMs.orElse(null),
toArgs(results, columnPrefix + Columns.ARGS),
toArgsOrNull(results, columnPrefix + Columns.ARGS),
stringOrThrow(results, columnPrefix + Columns.NAMESPACE_NAME),
stringOrThrow(results, columnPrefix + Columns.JOB_NAME),
uuidOrNull(results, columnPrefix + Columns.JOB_VERSION),
Expand All @@ -82,7 +83,9 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context)
columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS)
? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS)
: ImmutableList.of(),
JobMapper.toContext(results, columnPrefix + Columns.CONTEXT),
columnNames.contains(columnPrefix + Columns.CONTEXT)
? JobMapper.toContext(results, columnPrefix + Columns.CONTEXT)
: null,
toFacetsOrNull(results, columnPrefix + Columns.FACETS));
}

Expand All @@ -94,8 +97,12 @@ private List<DatasetVersionId> toDatasetVersion(ResultSet rs, String column) thr
return Utils.fromJson(dsString, new TypeReference<List<DatasetVersionId>>() {});
}

private Map<String, String> toArgs(ResultSet results, String column) throws SQLException {
String args = stringOrNull(results, column);
private Map<String, String> toArgsOrNull(ResultSet results, String argsColumn)
throws SQLException {
if (!Columns.exists(results, argsColumn)) {
return ImmutableMap.of();
}
String args = stringOrNull(results, argsColumn);
if (args == null) {
return null;
}
Expand Down
10 changes: 7 additions & 3 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public LineageService(LineageDao delegate, JobDao jobDao) {
this.jobDao = jobDao;
}

public Lineage lineage(NodeId nodeId, int depth) {
// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
Optional<UUID> optionalUUID = getJobUuid(nodeId);
if (optionalUUID.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
Expand All @@ -56,8 +57,11 @@ public Lineage lineage(NodeId nodeId, int depth) {
Set<JobData> jobData = getLineage(Collections.singleton(job), depth);

List<Run> runs =
getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));
// todo fix runtime
withRunFacets
? getCurrentRunsWithFacets(
jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()))
: getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));

for (JobData j : jobData) {
if (j.getLatestRun().isEmpty()) {
for (Run run : runs) {
Expand Down
3 changes: 2 additions & 1 deletion api/src/test/java/marquez/api/OpenLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -41,7 +42,7 @@ class OpenLineageResourceTest {
OpenLineageResourceTest.class.getResourceAsStream("/lineage/node.json"),
new TypeReference<>() {});
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
when(lineageService.lineage(any(NodeId.class), anyInt())).thenReturn(LINEAGE);
when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE);

ServiceFactory serviceFactory =
ApiTestUtils.mockServiceFactory(Map.of(LineageService.class, lineageService));
Expand Down
54 changes: 52 additions & 2 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ public void testGetCurrentRuns() {
Stream.of(writeJob.getJob().getUuid()), newRows.stream().map(JobLineage::getId))
.collect(Collectors.toSet());

List<Run> currentRuns = lineageDao.getCurrentRuns(jobids);
List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
Expand All @@ -790,7 +790,7 @@ public void testGetCurrentRunsWithFailedJob() {

Set<UUID> jobids = Collections.singleton(writeJob.getJob().getUuid());

List<Run> currentRuns = lineageDao.getCurrentRuns(jobids);
List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
Expand All @@ -799,6 +799,56 @@ public void testGetCurrentRunsWithFailedJob() {
.contains(writeJob.getRun().getUuid());
}

@Test
public void testGetCurrentRunsWithFacetsGetsLatestRun() {
for (int i = 0; i < 5; i++) {
LineageTestUtils.createLineageRow(
openLineageDao,
"writeJob",
"COMPLETE",
jobFacet,
Arrays.asList(),
Arrays.asList(dataset));
}

List<JobLineage> newRows =
writeDownstreamLineage(
openLineageDao,
new LinkedList<>(
Arrays.asList(
new DatasetConsumerJob("readJob", 3, Optional.of("outputData2")),
new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))),
jobFacet,
dataset);
UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
openLineageDao, "writeJob", "FAIL", jobFacet, Arrays.asList(), Arrays.asList(dataset));

Set<UUID> expectedRunIds =
Stream.concat(
Stream.of(writeJob.getRun().getUuid()), newRows.stream().map(JobLineage::getRunId))
.collect(Collectors.toSet());
Set<UUID> jobids =
Stream.concat(
Stream.of(writeJob.getJob().getUuid()), newRows.stream().map(JobLineage::getId))
.collect(Collectors.toSet());

List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
.hasSize(expectedRunIds.size())
.extracting(r -> r.getId().getValue())
.containsAll(expectedRunIds);

// assert that run_args, input/output versions, and run facets are fetched from the dao.
for (Run run : currentRuns) {
assertThat(run.getArgs()).hasSize(2);
assertThat(run.getOutputVersions()).hasSize(1);
assertThat(run.getFacets()).hasSize(1);
}
}

@Test
public void testGetCurrentRunsGetsLatestRun() {
for (int i = 0; i < 5; i++) {
Expand Down
16 changes: 11 additions & 5 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void testLineage() {
dataset);
String jobName = writeJob.getJob().getName();
Lineage lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 1 commonDataset
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -232,7 +233,8 @@ public void testLineageWithDeletedDataset() {

String jobName = writeJob.getJob().getName();
Lineage lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 0 commonDataset is hidden
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -281,7 +283,8 @@ public void testLineageWithDeletedDataset() {
jobDao.delete(NAMESPACE, "downstreamJob0<-outputData<-readJob0<-commonDataset");

lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 0 commonDataset is hidden
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -311,7 +314,9 @@ public void testLineageWithNoDatasets() {
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());
Lineage lineage =
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())), 5);
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())),
5,
true);
assertThat(lineage.getGraph())
.hasSize(1)
.first()
Expand Down Expand Up @@ -362,7 +367,8 @@ public void testLineageWithWithCycle() {
lineageService.lineage(
NodeId.of(
new NamespaceName(NAMESPACE), new JobName(intermediateJob.getJob().getName())),
5);
5,
true);
assertThat(lineage.getGraph()).extracting(Node::getId).hasSize(6);
ObjectAssert<Node> datasetNode =
assertThat(lineage.getGraph())
Expand Down

0 comments on commit 6dad6aa

Please sign in to comment.