Skip to content

Commit

Permalink
Merge branch 'main' into bug/airflow-integration-test-on-fork
Browse files Browse the repository at this point in the history
  • Loading branch information
wslulciuc authored Feb 10, 2021
2 parents c519657 + 77d1404 commit e91bc49
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 69 deletions.
17 changes: 13 additions & 4 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface JobDao {
+ "created_at, "
+ "updated_at, "
+ "namespace_uuid, "
+ "namespace_name, "
+ "name, "
+ "description, "
+ "current_version_uuid"
Expand All @@ -44,6 +45,7 @@ public interface JobDao {
+ ":createdAt, "
+ ":updatedAt, "
+ ":namespaceUuid, "
+ ":namespaceName, "
+ ":name, "
+ ":description, "
+ ":currentVersionUuid)")
Expand Down Expand Up @@ -80,9 +82,8 @@ public interface JobDao {
Optional<JobRow> find(String namespaceName, String jobName);

@SqlQuery(
"SELECT j.*, n.name AS namespace_name FROM jobs AS j "
+ "INNER JOIN namespaces AS n "
+ " ON (n.name = :namespaceName AND j.namespace_uuid = n.uuid) "
"SELECT j.* FROM jobs AS j "
+ "WHERE namespace_name = :namespaceName "
+ "ORDER BY j.name "
+ "LIMIT :limit OFFSET :offset")
List<JobRow> findAll(String namespaceName, int limit, int offset);
Expand All @@ -97,6 +98,7 @@ public interface JobDao {
+ "created_at, "
+ "updated_at, "
+ "namespace_uuid, "
+ "namespace_name, "
+ "name, "
+ "description "
+ ") VALUES ( "
Expand All @@ -105,6 +107,7 @@ public interface JobDao {
+ ":now, "
+ ":now, "
+ ":namespaceUuid, "
+ ":namespaceName, "
+ ":name, "
+ ":description "
+ ") ON CONFLICT (name, namespace_uuid) DO "
Expand All @@ -114,5 +117,11 @@ public interface JobDao {
+ "description = EXCLUDED.description "
+ "RETURNING *")
JobRow upsert(
UUID uuid, JobType type, Instant now, UUID namespaceUuid, String name, String description);
UUID uuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description);
}
103 changes: 58 additions & 45 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.jdbi.v3.sqlobject.CreateSqlObject;
import org.jdbi.v3.sqlobject.SqlObject;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBean;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;
Expand All @@ -41,50 +42,51 @@ enum IoType {

@Transaction
default void insert(@NonNull JobVersionRow row) {
withHandle(
handle ->
handle
.createUpdate(
"INSERT INTO job_versions ("
+ "uuid, "
+ "created_at, "
+ "updated_at, "
+ "job_uuid, "
+ "version, "
+ "location, "
+ "latest_run_uuid, "
+ "job_context_uuid"
+ ") VALUES ("
+ ":uuid, "
+ ":createdAt, "
+ ":updateAt, "
+ ":jobUuid, "
+ ":version, "
+ ":location, "
+ ":latestRunUuid, "
+ ":jobContextUuid)")
.bindBean(row)
.execute());
insert_job_only(row);

// I/O
row.getInputUuids().forEach(inputUuid -> updateInputs(row.getUuid(), inputUuid));
row.getOutputUuids().forEach(outputUuid -> updateOutputs(row.getUuid(), outputUuid));
for (UUID inputUuid : row.getInputUuids()) {
updateInputsOrOutputs(row.getUuid(), inputUuid, IoType.INPUT.name());
}
for (UUID outputUuid : row.getOutputUuids()) {
updateInputsOrOutputs(row.getUuid(), outputUuid, IoType.OUTPUT.name());
}

// Version
final Instant updatedAt = row.getCreatedAt();
createJobDao().updateVersion(row.getJobUuid(), updatedAt, row.getUuid());
}

@SqlUpdate(
"INSERT INTO job_versions ("
+ "uuid, "
+ "created_at, "
+ "updated_at, "
+ "job_uuid, "
+ "version, "
+ "location, "
+ "latest_run_uuid, "
+ "job_name, "
+ "namespace_uuid, "
+ "namespace_name, "
+ "job_context_uuid"
+ ") VALUES ("
+ ":uuid, "
+ ":createdAt, "
+ ":updateAt, "
+ ":jobUuid, "
+ ":version, "
+ ":location, "
+ ":latestRunUuid, "
+ ":jobName, "
+ ":namespaceUuid, "
+ ":namespaceName, "
+ ":jobContextUuid)")
void insert_job_only(@BindBean JobVersionRow row);

@SqlQuery("SELECT EXISTS (SELECT 1 FROM job_versions WHERE version = :version)")
boolean exists(UUID version);

default void updateInputs(UUID versionUuid, UUID inputUuid) {
updateInputsOrOutputs(versionUuid, inputUuid, IoType.INPUT.name());
}

default void updateOutputs(UUID versionUuid, UUID outputUuid) {
updateInputsOrOutputs(versionUuid, outputUuid, IoType.OUTPUT.name());
}

@SqlUpdate(
"INSERT INTO job_versions_io_mapping (job_version_uuid, dataset_uuid, io_type) "
+ "VALUES (:versionUuid, :datasetUuid, :ioType)")
Expand All @@ -98,7 +100,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {
void updateLatestRun(UUID rowUuid, Instant updatedAt, UUID latestRunUuid);

final String EXTENDED_SELECT =
"SELECT j.namespace_uuid, jv.*, jc.uuid AS job_context_uuid, jc.context, n.name as namespace_name, j.name, "
"SELECT jv.namespace_uuid, jv.*, jc.uuid AS job_context_uuid, jc.context, jv.namespace_name, jv.job_name as name, "
+ "ARRAY(SELECT dataset_uuid "
+ " FROM job_versions_io_mapping "
+ " WHERE job_version_uuid = jv.uuid AND "
Expand All @@ -108,10 +110,6 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {
+ " WHERE job_version_uuid = jv.uuid AND "
+ " io_type = 'OUTPUT') AS output_uuids "
+ "FROM job_versions AS jv "
+ "INNER JOIN jobs AS j "
+ " ON j.uuid = jv.job_uuid "
+ "INNER JOIN namespaces AS n "
+ " ON j.namespace_uuid = n.uuid "
+ "INNER JOIN job_contexts AS jc "
+ " ON job_context_uuid = jc.uuid ";

Expand All @@ -120,7 +118,9 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {

@SqlQuery(
EXTENDED_SELECT
+ "WHERE n.name = :namespaceName AND j.name = :jobName AND j.current_version_uuid = jv.uuid "
+ "INNER JOIN jobs AS j "
+ " ON j.uuid = jv.job_uuid "
+ "WHERE jv.namespace_name = :namespaceName AND jv.job_name = :jobName AND j.current_version_uuid = jv.uuid "
+ "ORDER BY created_at DESC "
+ "LIMIT 1")
Optional<ExtendedJobVersionRow> findLatest(String namespaceName, String jobName);
Expand All @@ -130,7 +130,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {

@SqlQuery(
EXTENDED_SELECT
+ "WHERE n.name = :namespaceName AND j.name = :jobName "
+ "WHERE jv.namespace_name = :namespaceName AND jv.job_name = :jobName "
+ "ORDER BY created_at DESC "
+ "LIMIT :limit OFFSET :offset")
List<ExtendedJobVersionRow> findAll(String namespaceName, String jobName, int limit, int offset);
Expand All @@ -146,23 +146,36 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) {
+ "job_uuid, "
+ "job_context_uuid, "
+ "location,"
+ "version"
+ "version,"
+ "job_name,"
+ "namespace_uuid,"
+ "namespace_name"
+ ") VALUES ("
+ ":uuid, "
+ ":now, "
+ ":now, "
+ ":jobUuid, "
+ ":jobContextUuid, "
+ ":location, "
+ ":version) "
+ ":version, "
+ ":jobName, "
+ ":namespaceUuid, "
+ ":namespaceName) "
+ "ON CONFLICT(version) DO "
+ "UPDATE SET "
+ "updated_at = EXCLUDED.updated_at, "
+ "job_uuid = EXCLUDED.job_uuid, "
+ "job_context_uuid = EXCLUDED.job_context_uuid "
+ "RETURNING *")
ExtendedJobVersionRow upsert(
UUID uuid, Instant now, UUID jobUuid, UUID jobContextUuid, String location, UUID version);
UUID uuid,
Instant now,
UUID jobUuid,
UUID jobContextUuid,
String location,
UUID version,
String jobName,
UUID namespaceUuid,
String namespaceName);

@SqlUpdate(
"INSERT INTO job_versions_io_mapping ("
Expand Down
6 changes: 5 additions & 1 deletion api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event) {
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
event.getJob().getName(),
description);
bag.setJob(job);
Expand All @@ -128,7 +129,10 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event) {
job.getUuid(),
jobContext.getUuid(),
location,
buildJobVersion(event, context));
buildJobVersion(event, context),
job.getName(),
job.getNamespaceUuid(),
job.getNamespaceName());
bag.setJobVersion(jobVersion);

jobDao.updateVersion(job.getUuid(), Instant.now(), jobVersion.getUuid());
Expand Down
4 changes: 1 addition & 3 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ default void updateJobVersionUuid(UUID rowUuid, Instant updatedAt, UUID jobVersi
@SqlQuery(
SELECT_RUN
+ "INNER JOIN job_versions AS jv ON r.job_version_uuid = jv.uuid "
+ "INNER JOIN jobs AS j ON jv.job_uuid = j.uuid "
+ "INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid "
+ "WHERE n.name = :namespace and j.name = :jobName "
+ "WHERE jv.namespace_name = :namespace and jv.job_name = :jobName "
+ "ORDER BY r.created_at DESC "
+ "LIMIT :limit OFFSET :offset")
List<ExtendedRunRow> findAll(String namespace, String jobName, int limit, int offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public ExtendedJobVersionRow map(@NonNull ResultSet results, @NonNull StatementC
uuidOrThrow(results, Columns.VERSION),
uuidOrNull(results, Columns.LATEST_RUN_UUID),
columnNames.contains(Columns.CONTEXT) ? stringOrThrow(results, Columns.CONTEXT) : "",
columnNames.contains(Columns.NAMESPACE_NAME)
? stringOrThrow(results, Columns.NAMESPACE_NAME)
: "",
columnNames.contains(Columns.NAME) ? stringOrThrow(results, Columns.NAME) : "");
stringOrThrow(results, Columns.NAMESPACE_NAME),
columnNames.contains(Columns.NAME) ? stringOrThrow(results, Columns.NAME) : "",
uuidOrThrow(results, Columns.NAMESPACE_UUID));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ public ExtendedJobVersionRow(
final UUID latestRunUuid,
@NonNull final String context,
@NonNull final String namespaceName,
@NonNull final String name) {
@NonNull final String name,
@NonNull final UUID namespaceUuid) {
super(
uuid,
createdAt,
updatedAt,
jobUuid,
name,
jobContextUuid,
inputUuids,
outputUuids,
location,
version,
latestRunUuid);
latestRunUuid,
namespaceUuid,
namespaceName);
this.context = context;
this.namespaceName = namespaceName;
this.name = name;
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/models/JobVersionRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ public class JobVersionRow {
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private final Instant updateAt;
@Getter @NonNull private final UUID jobUuid;
@Getter @NonNull private final String jobName;
@Getter @NonNull private final UUID jobContextUuid;
@Getter @NonNull private final List<UUID> inputUuids;
@Getter @NonNull private final List<UUID> outputUuids;
@Nullable private final String location;
@Getter @NonNull private final UUID version;
@Nullable private final UUID latestRunUuid;
@Getter @NonNull private final UUID namespaceUuid;
@Getter @NonNull private final String namespaceName;

public boolean hasInputUuids() {
return !inputUuids.isEmpty();
Expand Down
30 changes: 26 additions & 4 deletions api/src/main/java/marquez/service/JobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.stream.Collectors.groupingBy;
import static marquez.db.OpenLineageDao.DEFAULT_NAMESPACE_OWNER;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -97,12 +98,17 @@ public JobService(
public Job createOrUpdate(
@NonNull NamespaceName namespaceName, @NonNull JobName jobName, @NonNull JobMeta jobMeta)
throws MarquezServiceException {
NamespaceRow namespace =
namespaceDao.upsert(
UUID.randomUUID(), Instant.now(), namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER);
JobRow job = getOrCreateJobRow(namespaceName, jobName, jobMeta);

final Version jobVersion = jobMeta.version(namespaceName, jobName);
if (!jobVersionDao.exists(jobVersion.getValue())) {
UUID jobVersionUuid =
createJobVersion(job.getUuid(), jobVersion, namespaceName, jobName, jobMeta).getUuid();
createJobVersion(
job.getUuid(), jobVersion, namespace.getUuid(), namespaceName, jobName, jobMeta)
.getUuid();
updateRunFromJobMeta(jobMeta, jobVersionUuid, namespaceName, jobName);
// Get a new job as versions have been attached
return get(namespaceName, jobName).get();
Expand All @@ -127,6 +133,7 @@ private JobRow getOrCreateJobRow(NamespaceName namespaceName, JobName jobName, J
private JobVersionRow createJobVersion(
UUID jobId,
Version jobVersion,
UUID namespaceUuid,
NamespaceName namespaceName,
JobName jobName,
JobMeta jobMeta) {
Expand All @@ -143,7 +150,10 @@ private JobVersionRow createJobVersion(
mapDatasetToUuid(inputRows),
mapDatasetToUuid(outputRows),
jobMeta.getLocation().orElse(null),
jobVersion);
jobVersion,
jobName.getValue(),
namespaceUuid,
namespaceName.getValue());

JobMetrics.emitVersionMetric(
namespaceName.getValue(), jobMeta.getType().toString(), jobName.getValue());
Expand Down Expand Up @@ -215,9 +225,21 @@ private JobVersionRow createJobVersionRow(
List<UUID> input,
List<UUID> output,
URL location,
Version jobVersion) {
Version jobVersion,
String jobName,
UUID namespaceUuid,
String namespaceName) {
final JobVersionRow newJobVersionRow =
Mapper.toJobVersionRow(jobRowId, contextRowId, input, output, location, jobVersion);
Mapper.toJobVersionRow(
jobRowId,
jobName,
contextRowId,
input,
output,
location,
jobVersion,
namespaceUuid,
namespaceName);
jobVersionDao.insert(newJobVersionRow);

return newJobVersionRow;
Expand Down
Loading

0 comments on commit e91bc49

Please sign in to comment.