Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runs row reduction #2041

Merged
merged 2 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,22 +286,19 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
* code location, and context. A version for a given job is created <i>only</i> when a {@link Run}
* transitions into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state.
*
* @param namespaceName The namespace for the job version.
* @param jobName The name of the job.
* @param jobRow The job.
* @param runUuid The unique ID of the run associated with the job version.
* @param runState The current run state.
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
@NonNull String namespaceName,
@NonNull String jobName,
@NonNull JobRow jobRow,
@NonNull UUID runUuid,
@NonNull RunState runState,
@NonNull Instant transitionedAt) {
// Get the job.
final JobDao jobDao = createJobDao();
final JobRow jobRow = jobDao.findJobByNameAsRow(namespaceName, jobName).get();

// Get the job context.
final UUID jobContextUuid = jobRow.getJobContextUuid().get();
Expand Down
61 changes: 30 additions & 31 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
Expand Down Expand Up @@ -161,10 +160,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
Optional<ExtendedRunRow> parentRunRow = runDao.findRunByUuidAsRow(uuid);
JobRow parentJobRow =
parentRunRow
.flatMap(run -> jobDao.findJobByUuidAsRow(run.getJobUuid()))
runDao
.findJobRowByRunUuid(uuid)
.orElseGet(
() -> {
JobRow newParentJobRow =
Expand All @@ -181,34 +179,36 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
null,
inputs);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(),
now,
"{}",
Utils.checksumFor(ImmutableMap.of()));
RunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
return newParentJobRow;
});
log.debug("Found parent job record {}", parentJobRow);
if (parentRunRow.isEmpty()) {
RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));
ExtendedRunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
parentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
parentJobRow.getName(),
parentJobRow.getLocation(),
parentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
}
return parentJobRow;
} catch (Exception e) {
throw new RuntimeException("Unable to insert parent run", e);
Expand Down Expand Up @@ -390,8 +390,7 @@ default void updateMarquezOnComplete(
BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionOnRunTransition(
updateLineageRow.getRun().getNamespaceName(),
updateLineageRow.getRun().getJobName(),
updateLineageRow.getJob(),
updateLineageRow.getRun().getUuid(),
runState,
event.getEventTime().toInstant());
Expand Down
95 changes: 20 additions & 75 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import marquez.common.models.RunId;
import marquez.common.models.RunState;
import marquez.db.mappers.ExtendedRunRowMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.mappers.RunRowMapper;
import marquez.db.models.DatasetRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobRow;
Expand All @@ -40,7 +42,9 @@
import org.jdbi.v3.sqlobject.transaction.Transaction;

@RegisterRowMapper(ExtendedRunRowMapper.class)
@RegisterRowMapper(RunRowMapper.class)
@RegisterRowMapper(RunMapper.class)
@RegisterRowMapper(JobRowMapper.class)
public interface RunDao extends BaseDao {
@SqlQuery("SELECT EXISTS (SELECT 1 FROM runs WHERE uuid = :rowUuid)")
boolean exists(UUID rowUuid);
Expand Down Expand Up @@ -103,7 +107,18 @@ public interface RunDao extends BaseDao {
Optional<Run> findRunByUuid(UUID runUuid);

@SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid")
Optional<ExtendedRunRow> findRunByUuidAsRow(UUID runUuid);
Optional<ExtendedRunRow> findRunByUuidAsExtendedRow(UUID runUuid);

@SqlQuery("SELECT * FROM runs r WHERE r.uuid = :runUuid")
Optional<RunRow> findRunByUuidAsRow(UUID runUuid);

@SqlQuery(
"""
SELECT j.* FROM jobs_view j
INNER JOIN runs_view r ON r.job_uuid=j.uuid
WHERE r.uuid=:uuid
""")
Optional<JobRow> findJobRowByRunUuid(UUID uuid);

@SqlQuery(
"""
Expand Down Expand Up @@ -195,8 +210,8 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
+ "nominal_start_time = COALESCE(EXCLUDED.nominal_start_time, runs.nominal_start_time), "
+ "nominal_end_time = COALESCE(EXCLUDED.nominal_end_time, runs.nominal_end_time), "
+ "location = EXCLUDED.location "
+ "RETURNING uuid")
UUID upsertWithRunState(
+ "RETURNING *")
RunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Expand All @@ -213,42 +228,6 @@ UUID upsertWithRunState(
String location,
UUID jobContextUuid);

default ExtendedRunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Instant nominalEndTime,
RunState runStateType,
Instant runStateTime,
String namespaceName,
String jobName,
String location,
UUID jobContextUuid) {
UUID rowUuid =
upsertWithRunState(
runUuid,
parentRunUuid,
externalId,
now,
jobUuid,
jobVersionUuid,
runArgsUuid,
nominalStartTime,
nominalEndTime,
runStateType,
runStateTime,
namespaceName,
jobName,
location,
jobContextUuid);
return findRunByUuidAsRow(rowUuid).get();
}

@SqlQuery(
"INSERT INTO runs ( "
+ "uuid, "
Expand Down Expand Up @@ -286,8 +265,8 @@ default ExtendedRunRow upsert(
+ "nominal_start_time = COALESCE(EXCLUDED.nominal_start_time, runs.nominal_start_time), "
+ "nominal_end_time = COALESCE(EXCLUDED.nominal_end_time, runs.nominal_end_time), "
+ "location = EXCLUDED.location "
+ "RETURNING uuid")
UUID upsertWithoutRunState(
+ "RETURNING *")
RunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Expand All @@ -303,40 +282,6 @@ UUID upsertWithoutRunState(
String location,
UUID jobContextUuid);

default ExtendedRunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Instant nominalEndTime,
UUID namespaceUuid,
String namespaceName,
String jobName,
String location,
UUID jobContextUuid) {
UUID runRowUuid =
upsertWithoutRunState(
runUuid,
parentRunUuid,
externalId,
now,
jobUuid,
jobVersionUuid,
runArgsUuid,
nominalStartTime,
nominalEndTime,
namespaceUuid,
namespaceName,
jobName,
location,
jobContextUuid);
return findRunByUuidAsRow(runRowUuid).get();
}

@SqlUpdate(
"INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) "
+ "VALUES (:runUuid, :datasetVersionUuid) ON CONFLICT DO NOTHING")
Expand Down
60 changes: 60 additions & 0 deletions api/src/main/java/marquez/db/mappers/RunRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.timestampOrNull;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrNull;
import static marquez.db.Columns.uuidOrThrow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.common.models.DatasetVersionId;
import marquez.db.Columns;
import marquez.db.models.RunRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public final class RunRowMapper implements RowMapper<RunRow> {
@Override
public RunRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
Set<String> columnNames = MapperUtils.getColumnNames(results.getMetaData());

return new RunRow(
uuidOrThrow(results, Columns.ROW_UUID),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT),
uuidOrNull(results, Columns.JOB_UUID),
uuidOrNull(results, Columns.JOB_VERSION_UUID),
uuidOrNull(results, Columns.PARENT_RUN_UUID),
uuidOrThrow(results, Columns.RUN_ARGS_UUID),
timestampOrNull(results, Columns.NOMINAL_START_TIME),
timestampOrNull(results, Columns.NOMINAL_END_TIME),
stringOrNull(results, Columns.CURRENT_RUN_STATE),
columnNames.contains(Columns.STARTED_AT)
? timestampOrNull(results, Columns.STARTED_AT)
: null,
uuidOrNull(results, Columns.START_RUN_STATE_UUID),
columnNames.contains(Columns.ENDED_AT) ? timestampOrNull(results, Columns.ENDED_AT) : null,
uuidOrNull(results, Columns.END_RUN_STATE_UUID));
}

private List<DatasetVersionId> toDatasetVersion(ResultSet rs, String column) throws SQLException {
String dsString = rs.getString(column);
if (dsString == null) {
return Collections.emptyList();
}
return Utils.fromJson(dsString, new TypeReference<List<DatasetVersionId>>() {});
}
}
18 changes: 13 additions & 5 deletions api/src/main/java/marquez/db/models/ExtendedRunRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class ExtendedRunRow extends RunRow {
@Getter @NonNull private final List<DatasetVersionId> inputVersions;
@Getter @NonNull private final List<DatasetVersionId> outputVersions;
@Getter private final String namespaceName;
@Getter private final String jobName;
@Getter private final String args;

public ExtendedRunRow(
Expand Down Expand Up @@ -48,17 +52,21 @@ public ExtendedRunRow(
jobVersionUuid,
parentRunUuid,
runArgsUuid,
inputVersions,
outputVersions,
nominalStartTime,
nominalEndTime,
currentRunState,
startedAt,
startRunStateUuid,
endedAt,
endRunStateUuid,
namespaceName,
jobName);
endRunStateUuid);
this.inputVersions = inputVersions;
this.outputVersions = outputVersions;
this.args = args;
this.jobName = jobName;
this.namespaceName = namespaceName;
}

public boolean hasInputVersionUuids() {
return !inputVersions.isEmpty();
}
}
Loading