diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index 869da847dd..a92479368e 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -28,14 +28,15 @@ import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; -import marquez.db.JobContextDao; import marquez.db.JobDao; +import marquez.db.JobFacetsDao; import marquez.db.JobVersionDao; import marquez.db.LineageDao; import marquez.db.NamespaceDao; import marquez.db.OpenLineageDao; import marquez.db.RunArgsDao; import marquez.db.RunDao; +import marquez.db.RunFacetsDao; import marquez.db.RunStateDao; import marquez.db.SearchDao; import marquez.db.SourceDao; @@ -67,9 +68,10 @@ public final class MarquezContext { @Getter private final DatasetVersionDao datasetVersionDao; @Getter private final JobDao jobDao; @Getter private final JobVersionDao jobVersionDao; - @Getter private final JobContextDao jobContextDao; + @Getter private final JobFacetsDao jobFacetsDao; @Getter private final RunDao runDao; @Getter private final RunArgsDao runArgsDao; + @Getter private final RunFacetsDao runFacetsDao; @Getter private final RunStateDao runStateDao; @Getter private final TagDao tagDao; @Getter private final OpenLineageDao openLineageDao; @@ -116,9 +118,10 @@ private MarquezContext( this.datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class); this.jobDao = jdbi.onDemand(JobDao.class); this.jobVersionDao = jdbi.onDemand(JobVersionDao.class); - this.jobContextDao = jdbi.onDemand(JobContextDao.class); + this.jobFacetsDao = jdbi.onDemand(JobFacetsDao.class); this.runDao = jdbi.onDemand(RunDao.class); this.runArgsDao = jdbi.onDemand(RunArgsDao.class); + this.runFacetsDao = jdbi.onDemand(RunFacetsDao.class); this.runStateDao = jdbi.onDemand(RunStateDao.class); this.tagDao = jdbi.onDemand(TagDao.class); this.openLineageDao = jdbi.onDemand(OpenLineageDao.class); @@ -158,7 +161,7 @@ private MarquezContext( this.sourceResource = new SourceResource(serviceFactory); this.datasetResource = new DatasetResource(serviceFactory); this.columnLineageResource = new ColumnLineageResource(serviceFactory); - this.jobResource = new JobResource(serviceFactory, jobVersionDao); + this.jobResource = new JobResource(serviceFactory, jobVersionDao, jobFacetsDao, runFacetsDao); this.tagResource = new TagResource(serviceFactory); this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao); this.searchResource = new SearchResource(searchDao); diff --git a/api/src/main/java/marquez/api/JobResource.java b/api/src/main/java/marquez/api/JobResource.java index 49cb3bc629..0a15b6d76f 100644 --- a/api/src/main/java/marquez/api/JobResource.java +++ b/api/src/main/java/marquez/api/JobResource.java @@ -15,6 +15,7 @@ import java.util.List; import javax.validation.Valid; import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -37,11 +38,14 @@ import marquez.api.exceptions.JobVersionNotFoundException; import marquez.api.models.JobVersion; import marquez.api.models.ResultsPage; +import marquez.common.models.FacetType; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; import marquez.common.models.RunId; import marquez.common.models.Version; +import marquez.db.JobFacetsDao; import marquez.db.JobVersionDao; +import marquez.db.RunFacetsDao; import marquez.db.models.JobRow; import marquez.service.ServiceFactory; import marquez.service.models.Job; @@ -52,11 +56,18 @@ @Path("/api/v1") public class JobResource extends BaseResource { private final JobVersionDao jobVersionDao; + private final JobFacetsDao jobFacetsDao; + private final RunFacetsDao runFacetsDao; public JobResource( - @NonNull final ServiceFactory serviceFactory, @NonNull final JobVersionDao jobVersionDao) { + @NonNull final ServiceFactory serviceFactory, + @NonNull final JobVersionDao jobVersionDao, + @NonNull JobFacetsDao jobFacetsDao, + @NonNull RunFacetsDao runFacetsDao) { super(serviceFactory); this.jobVersionDao = jobVersionDao; + this.jobFacetsDao = jobFacetsDao; + this.runFacetsDao = runFacetsDao; } /** @@ -236,6 +247,33 @@ public RunResource runResourceRoot(@PathParam("id") RunId runId) { return new RunResource(runId, runService); } + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Produces(APPLICATION_JSON) + @Path("/jobs/runs/{id}/facets") + public Response getRunFacets( + @PathParam("id") RunId runId, @QueryParam("type") @NotNull FacetType type) { + throwIfNotExists(runId); + Object facets = null; + switch (type) { + case JOB: + facets = jobFacetsDao.findJobFacetsByRunUuid(runId.getValue()); + break; + case RUN: + facets = runFacetsDao.findRunFacetsByRunUuid(runId.getValue()); + break; + case DATASET: + // for future case if there's a need to add dataset facets to the endpoint + break; + default: + break; + } + + return Response.ok(facets).build(); + } + @Value static class JobVersions { @NonNull diff --git a/api/src/main/java/marquez/api/models/JobVersion.java b/api/src/main/java/marquez/api/models/JobVersion.java index c04f86b363..d7fbd6597c 100644 --- a/api/src/main/java/marquez/api/models/JobVersion.java +++ b/api/src/main/java/marquez/api/models/JobVersion.java @@ -5,7 +5,6 @@ package marquez.api.models; -import com.google.common.collect.ImmutableMap; import java.net.URL; import java.time.Instant; import java.util.List; @@ -35,7 +34,6 @@ public final class JobVersion { @Getter private final Version version; @Getter private final NamespaceName namespace; @Nullable private final URL location; - @Getter private final ImmutableMap context; @Getter private final List inputs; @Getter private final List outputs; @Getter @Nullable private final Run latestRun; @@ -46,7 +44,6 @@ public JobVersion( @NonNull final Instant createdAt, @NonNull final Version version, @Nullable final URL location, - @Nullable final ImmutableMap context, List inputs, List outputs, @Nullable Run latestRun) { @@ -56,7 +53,6 @@ public JobVersion( this.version = version; this.namespace = id.getNamespace(); this.location = location; - this.context = (context == null) ? ImmutableMap.of() : context; this.inputs = inputs; this.outputs = outputs; this.latestRun = latestRun; diff --git a/api/src/main/java/marquez/common/Utils.java b/api/src/main/java/marquez/common/Utils.java index 3336d8f0ce..83a07416be 100644 --- a/api/src/main/java/marquez/common/Utils.java +++ b/api/src/main/java/marquez/common/Utils.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.hash.Hashing; import io.dropwizard.jackson.Jackson; @@ -236,7 +235,6 @@ public static Instant toInstant(@Nullable final String asIso) { * @param jobName The name of the job. * @param jobInputIds The input dataset IDs for the job. * @param jobOutputIds The output dataset IDs for the job. - * @param jobContext The context of the job. * @param jobLocation The source code location for the job. * @return A {@link Version} object based on the specified job meta. */ @@ -245,7 +243,6 @@ public static Version newJobVersionFor( @NonNull final JobName jobName, @NonNull final ImmutableSet jobInputIds, @NonNull final ImmutableSet jobOutputIds, - @NonNull final ImmutableMap jobContext, @Nullable final String jobLocation) { final byte[] bytes = VERSION_JOINER @@ -268,8 +265,7 @@ public static Version newJobVersionFor( jobOutputId.getNamespace().getValue(), jobOutputId.getName().getValue())) .collect(joining(VERSION_DELIM)), - jobLocation, - KV_JOINER.join(jobContext)) + jobLocation) .getBytes(UTF_8); return Version.of(UUID.nameUUIDFromBytes(bytes)); } diff --git a/api/src/main/java/marquez/common/models/DatasetId.java b/api/src/main/java/marquez/common/models/DatasetId.java index 98a9c2dc48..bf36e8c44c 100644 --- a/api/src/main/java/marquez/common/models/DatasetId.java +++ b/api/src/main/java/marquez/common/models/DatasetId.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ComparisonChain; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import lombok.EqualsAndHashCode; import lombok.NonNull; @@ -16,8 +15,8 @@ /** * ID for {@code Dataset}. The class implements {@link Comparable} to ensure job versions generated - * with {@link Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet, - * ImmutableMap, String)} are consistent as jobs may contain inputs and outputs out of order. + * with {@link Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet, String)} + * are consistent as jobs may contain inputs and outputs out of order. */ @EqualsAndHashCode @ToString diff --git a/api/src/main/java/marquez/common/models/FacetType.java b/api/src/main/java/marquez/common/models/FacetType.java new file mode 100644 index 0000000000..e9df74173b --- /dev/null +++ b/api/src/main/java/marquez/common/models/FacetType.java @@ -0,0 +1,12 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +public enum FacetType { + RUN, + JOB, + DATASET; +} diff --git a/api/src/main/java/marquez/db/BaseDao.java b/api/src/main/java/marquez/db/BaseDao.java index 94f7dfc257..ee6f37d84b 100644 --- a/api/src/main/java/marquez/db/BaseDao.java +++ b/api/src/main/java/marquez/db/BaseDao.java @@ -18,9 +18,6 @@ public interface BaseDao extends SqlObject { @CreateSqlObject DatasetVersionDao createDatasetVersionDao(); - @CreateSqlObject - JobContextDao createJobContextDao(); - @CreateSqlObject JobDao createJobDao(); diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index 3a3ddeb40d..fca22cff50 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -103,13 +103,9 @@ private Columns() {} /* JOB VERSION ROW COLUMNS */ public static final String JOB_UUID = "job_uuid"; - public static final String JOB_CONTEXT_UUID = "job_context_uuid"; public static final String LOCATION = "location"; public static final String LATEST_RUN_UUID = "latest_run_uuid"; - /* JOB CONTEXT ROW COLUMNS */ - public static final String CONTEXT = "context"; - /* RUN ROW COLUMNS */ public static final String EXTERNAL_ID = "external_id"; public static final String RUN_ARGS_UUID = "run_args_uuid"; diff --git a/api/src/main/java/marquez/db/JobContextDao.java b/api/src/main/java/marquez/db/JobContextDao.java deleted file mode 100644 index 139bfee71d..0000000000 --- a/api/src/main/java/marquez/db/JobContextDao.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2018-2023 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.db; - -import java.time.Instant; -import java.util.Optional; -import java.util.UUID; -import marquez.db.mappers.JobContextRowMapper; -import marquez.db.models.JobContextRow; -import org.jdbi.v3.sqlobject.config.RegisterRowMapper; -import org.jdbi.v3.sqlobject.statement.SqlQuery; -import org.jdbi.v3.sqlobject.statement.SqlUpdate; - -@RegisterRowMapper(JobContextRowMapper.class) -public interface JobContextDao { - @SqlQuery("SELECT * FROM job_contexts WHERE uuid = :uuid") - Optional findContextByUuid(UUID uuid); - - @SqlQuery("SELECT * FROM job_contexts WHERE checksum=:checksum") - Optional findContextByChecksum(String checksum); - - default JobContextRow upsert(UUID uuid, Instant now, String context, String checksum) { - doUpsert(uuid, now, context, checksum); - return findContextByChecksum(checksum).orElseThrow(); - } - - @SqlUpdate( - """ - INSERT INTO job_contexts - (uuid, created_at, context, checksum) - VALUES - (:uuid, :now, :context, :checksum) - ON CONFLICT (checksum) DO NOTHING - """) - void doUpsert(UUID uuid, Instant now, String context, String checksum); -} diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index aa5ca44832..89a29edbea 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -15,7 +15,6 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import marquez.common.Utils; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.JobName; @@ -23,7 +22,6 @@ import marquez.common.models.NamespaceName; import marquez.db.mappers.JobMapper; import marquez.db.mappers.JobRowMapper; -import marquez.db.models.JobContextRow; import marquez.db.models.JobRow; import marquez.db.models.NamespaceRow; import marquez.service.models.Job; @@ -58,10 +56,9 @@ SELECT EXISTS ( @SqlQuery( """ - SELECT j.*, jc.context, f.facets + SELECT j.*, f.facets FROM jobs_view j LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid - LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid LEFT OUTER JOIN ( SELECT run_uuid, JSON_AGG(e.facet) AS facets FROM ( @@ -130,10 +127,9 @@ default Optional findWithRun(String namespaceName, String jobName) { @SqlQuery( """ - SELECT j.*, jc.context, f.facets + SELECT j.*, f.facets FROM jobs_view AS j LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid - LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid LEFT OUTER JOIN ( SELECT run_uuid, JSON_AGG(e.facet) AS facets FROM ( @@ -208,13 +204,6 @@ default JobRow upsertJobMeta( createNamespaceDao() .upsertNamespaceRow( UUID.randomUUID(), createdAt, namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER); - JobContextRow contextRow = - createJobContextDao() - .upsert( - UUID.randomUUID(), - createdAt, - Utils.toJson(jobMeta.getContext()), - Utils.checksumFor(jobMeta.getContext())); return upsertJob( UUID.randomUUID(), jobMeta.getType(), @@ -223,7 +212,6 @@ default JobRow upsertJobMeta( namespace.getName(), jobName.getValue(), jobMeta.getDescription().orElse(null), - contextRow.getUuid(), toUrlString(jobMeta.getLocation().orElse(null)), symlinkTargetUuid, toJson(jobMeta.getInputs(), mapper)); @@ -276,7 +264,7 @@ INSERT INTO jobs_view AS j ( :namespaceName, :name, :description, - :jobContextUuid, + null, :location, :inputs, :symlinkTargetId, @@ -291,7 +279,6 @@ JobRow upsertJob( String namespaceName, String name, String description, - UUID jobContextUuid, String location, UUID symlinkTargetId, PGobject inputs); @@ -328,7 +315,7 @@ INSERT INTO jobs_view AS j ( :namespaceName, :name, :description, - :jobContextUuid, + null, :location, :inputs, :symlinkTargetId @@ -344,7 +331,6 @@ JobRow upsertJob( String namespaceName, String name, String description, - UUID jobContextUuid, String location, UUID symlinkTargetId, PGobject inputs); diff --git a/api/src/main/java/marquez/db/JobFacetsDao.java b/api/src/main/java/marquez/db/JobFacetsDao.java index 95fc934b4a..53518681aa 100644 --- a/api/src/main/java/marquez/db/JobFacetsDao.java +++ b/api/src/main/java/marquez/db/JobFacetsDao.java @@ -13,34 +13,39 @@ import java.util.stream.StreamSupport; import lombok.NonNull; import marquez.common.Utils; +import marquez.db.mappers.JobFacetsMapper; +import marquez.service.models.JobFacets; import marquez.service.models.LineageEvent; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.postgresql.util.PGobject; +@RegisterRowMapper(JobFacetsMapper.class) /** The DAO for {@code job} facets. */ public interface JobFacetsDao { @SqlUpdate( """ - INSERT INTO job_facets ( - created_at, - job_uuid, - run_uuid, - lineage_event_time, - lineage_event_type, - name, - facet - ) VALUES ( - :createdAt, - :jobUuid, - :runUuid, - :lineageEventTime, - :lineageEventType, - :name, - :facet - ) - """) + INSERT INTO job_facets ( + created_at, + job_uuid, + run_uuid, + lineage_event_time, + lineage_event_type, + name, + facet + ) VALUES ( + :createdAt, + :jobUuid, + :runUuid, + :lineageEventTime, + :lineageEventType, + :name, + :facet + ) + """) void insertJobFacet( Instant createdAt, UUID jobUuid, @@ -50,6 +55,20 @@ void insertJobFacet( String name, PGobject facet); + @SqlQuery( + """ + SELECT + run_uuid, + JSON_AGG(facet ORDER BY lineage_event_time) AS facets + FROM + job_facets_view + WHERE + run_uuid = :runUuid + GROUP BY + run_uuid + """) + JobFacets findJobFacetsByRunUuid(UUID runUuid); + @Transaction default void insertJobFacetsFor( @NonNull UUID jobUuid, diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 33bbaa725a..597414d2e8 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -5,9 +5,7 @@ package marquez.db; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; import java.time.Instant; @@ -28,7 +26,6 @@ import marquez.db.mappers.JobVersionMapper; import marquez.db.models.ExtendedDatasetVersionRow; import marquez.db.models.ExtendedJobVersionRow; -import marquez.db.models.JobContextRow; import marquez.db.models.JobRow; import marquez.db.models.JobVersionRow; import marquez.db.models.NamespaceRow; @@ -70,10 +67,9 @@ WITH job_version_io AS ( GROUP BY io.job_version_uuid ), relevant_job_versions AS ( SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version,\s - jv.location, jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid,\s - j.namespace_name, j.name AS job_name, jc.context + jv.location, jv.latest_run_uuid, j.namespace_uuid,\s + j.namespace_name, j.name AS job_name FROM job_versions jv - LEFT OUTER JOIN job_contexts AS jc ON jc.uuid = jv.job_context_uuid INNER JOIN jobs_view j ON j.uuid=jv.job_uuid WHERE j.name = :jobName AND j.namespace_name=:namespaceName ORDER BY jv.created_at DESC @@ -94,7 +90,6 @@ WITH job_version_io AS ( jv.version AS run_job_version, r.location AS run_location, ra.args AS run_args, - jv.context AS run_context, f.facets AS run_facets, ri.input_versions AS run_input_versions, ro.output_versions AS run_output_versions @@ -141,7 +136,6 @@ LEFT OUTER JOIN ( * @param jobVersionUuid The unique ID of the job version. * @param now The last modified timestamp of the job version. * @param jobUuid The unique ID of the job associated with the version. - * @param jobContextUuid The unique ID of the job context associated with the version. * @param jobLocation The source code location for the job. * @param version The version of the job; for internal use only. * @param jobName The name of the job. @@ -157,7 +151,6 @@ INSERT INTO job_versions ( created_at, updated_at, job_uuid, - job_context_uuid, location, version, job_name, @@ -168,7 +161,6 @@ INSERT INTO job_versions ( :now, :now, :jobUuid, - :jobContextUuid, :jobLocation, :version, :jobName, @@ -182,7 +174,6 @@ ExtendedJobVersionRow upsertJobVersion( UUID jobVersionUuid, Instant now, UUID jobUuid, - UUID jobContextUuid, String jobLocation, UUID version, String jobName, @@ -295,9 +286,9 @@ default List findOutputDatasetsFor(UUID jobVersionUuid) { /** * Used to upsert an immutable {@link JobVersionRow} object when a {@link Run} has transitioned. A * {@link Version} is generated using {@link Utils#newJobVersionFor(NamespaceName, JobName, - * ImmutableSet, ImmutableSet, ImmutableMap, String)} based on the jobs inputs and inputs, source - * code location, and context. A version for a given job is created only when a {@link Run} - * transitions into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state. + * ImmutableSet, ImmutableSet, String)} based on the jobs inputs and inputs, source code location, + * and context. A version for a given job is created only when a {@link Run} transitions + * into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state. * * @param jobRow The job. * @param runUuid The unique ID of the run associated with the job version. @@ -313,13 +304,6 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( // Get the job. final JobDao jobDao = createJobDao(); - // Get the job context. - final UUID jobContextUuid = jobRow.getJobContextUuid().get(); - final JobContextRow jobContextRow = - createJobContextDao().findContextByUuid(jobContextUuid).get(); - final ImmutableMap jobContext = - Utils.fromJson(jobContextRow.getContext(), new TypeReference<>() {}); - // Get the inputs and outputs dataset versions for the run associated with the job version. final DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); final List jobVersionInputs = @@ -341,7 +325,6 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( .orElse(jobRow.getName())), toDatasetIds(jobVersionInputs), toDatasetIds(jobVersionOutputs), - jobContext, jobRow.getLocation()); // Add the job version. @@ -351,7 +334,6 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( UUID.randomUUID(), transitionedAt, // Use the timestamp of when the run state transitioned. jobRow.getUuid(), - jobContextUuid, jobRow.getLocation(), jobVersion.getValue(), jobRow.getName(), diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index c9ec8afa41..a0a5835c16 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -40,7 +40,6 @@ import marquez.db.models.DatasetSymlinkRow; import marquez.db.models.DatasetVersionRow; import marquez.db.models.InputFieldData; -import marquez.db.models.JobContextRow; import marquez.db.models.JobRow; import marquez.db.models.NamespaceRow; import marquez.db.models.RunArgsRow; @@ -132,7 +131,6 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper DatasetDao datasetDao = createDatasetDao(); SourceDao sourceDao = createSourceDao(); JobDao jobDao = createJobDao(); - JobContextDao jobContextDao = createJobContextDao(); JobFacetsDao jobFacetsDao = createJobFacetsDao(); DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); DatasetFieldDao datasetFieldDao = createDatasetFieldDao(); @@ -154,12 +152,6 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper DEFAULT_NAMESPACE_OWNER); bag.setNamespace(namespace); - Map context = buildJobContext(event); - JobContextRow jobContext = - jobContextDao.upsert( - UUID.randomUUID(), now, Utils.toJson(context), Utils.checksumFor(context)); - bag.setJobContext(jobContext); - Instant nominalStartTime = Optional.ofNullable(event.getRun().getFacets()) .flatMap(f -> Optional.ofNullable(f.getNominalTime())) @@ -188,7 +180,6 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper jobDao, now, namespace, - jobContext, nominalStartTime, nominalEndTime, parentRun)); @@ -221,8 +212,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper now, namespace.getName(), job.getName(), - job.getLocation(), - jobContext.getUuid()); + job.getLocation()); // Add ... Optional.ofNullable(event.getRun().getFacets()) .ifPresent( @@ -244,8 +234,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper namespace.getUuid(), namespace.getName(), job.getName(), - job.getLocation(), - jobContext.getUuid()); + job.getLocation()); } bag.setRun(run); @@ -349,7 +338,6 @@ private JobRow buildJobFromEvent( JobDao jobDao, Instant now, NamespaceRow namespace, - JobContextRow jobContext, Instant nominalStartTime, Instant nominalEndTime, Optional parentRun) { @@ -373,7 +361,6 @@ private JobRow buildJobFromEvent( findParentJobRow( event, namespace, - jobContext, location, nominalStartTime, nominalEndTime, @@ -410,7 +397,6 @@ private JobRow buildJobFromEvent( namespace.getName(), jobName, description, - jobContext.getUuid(), location, null, jobDao.toJson(toDatasetId(event.getInputs()), mapper))) @@ -424,7 +410,6 @@ private JobRow buildJobFromEvent( namespace.getName(), jobName, description, - jobContext.getUuid(), location, null, jobDao.toJson(toDatasetId(event.getInputs()), mapper))); @@ -433,7 +418,6 @@ private JobRow buildJobFromEvent( private JobRow findParentJobRow( LineageEvent event, NamespaceRow namespace, - JobContextRow jobContext, String location, Instant nominalStartTime, Instant nominalEndTime, @@ -475,7 +459,6 @@ private JobRow findParentJobRow( return createParentJobRunRecord( event, namespace, - jobContext, location, nominalStartTime, nominalEndTime, @@ -489,7 +472,6 @@ private JobRow findParentJobRow( createParentJobRunRecord( event, namespace, - jobContext, location, nominalStartTime, nominalEndTime, @@ -506,7 +488,6 @@ private JobRow findParentJobRow( private JobRow createParentJobRunRecord( LineageEvent event, NamespaceRow namespace, - JobContextRow jobContext, String location, Instant nominalStartTime, Instant nominalEndTime, @@ -529,7 +510,6 @@ private JobRow createParentJobRunRecord( namespace.getName(), parentJobName, null, - jobContext.getUuid(), location, null, inputs); @@ -556,8 +536,7 @@ private JobRow createParentJobRunRecord( now, namespace.getName(), newParentJobRow.getName(), - newParentJobRow.getLocation(), - newParentJobRow.getJobContextUuid().orElse(null)); + newParentJobRow.getLocation()); log.info("Created new parent run record {}", newRow); runState @@ -921,29 +900,6 @@ default Map createRunArgs(LineageEvent event) { return args; } - default Map buildJobContext(LineageEvent event) { - Map args = new LinkedHashMap<>(); - if (event.getJob().getFacets() != null) { - if (event.getJob().getFacets().getSourceCodeLocation() != null) { - if (event.getJob().getFacets().getSourceCodeLocation().getType() != null) { - args.put( - "job.facets.sourceCodeLocation.type", - event.getJob().getFacets().getSourceCodeLocation().getType()); - } - if (event.getJob().getFacets().getSourceCodeLocation().getUrl() != null) { - args.put( - "job.facets.sourceCodeLocation.url", - event.getJob().getFacets().getSourceCodeLocation().getUrl()); - } - } - if (event.getJob().getFacets().getSql() != null) { - args.put("sql", event.getJob().getFacets().getSql().getQuery()); - } - } - - return args; - } - default UUID runToUuid(String runId) { try { return UUID.fromString(runId); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 13527f93be..a423af0d6b 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -74,7 +74,7 @@ public interface RunDao extends BaseDao { void updateEndState(UUID rowUuid, Instant transitionedAt, UUID endRunStateUuid); String BASE_FIND_RUN_SQL = - "SELECT r.*, ra.args, ctx.context, f.facets,\n" + "SELECT r.*, ra.args, f.facets,\n" + "jv.version AS job_version,\n" + "ri.input_versions, ro.output_versions\n" + "FROM runs_view AS r\n" @@ -85,7 +85,6 @@ public interface RunDao extends BaseDao { + " GROUP BY rf.run_uuid\n" + ") AS f ON r.uuid=f.run_uuid\n" + "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" - + "LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n" + "LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n" + "LEFT OUTER JOIN (\n" + " SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,\n" @@ -122,7 +121,7 @@ public interface RunDao extends BaseDao { @SqlQuery( """ - SELECT r.*, ra.args, ctx.context, f.facets, + SELECT r.*, ra.args, f.facets, j.namespace_name, j.name, jv.version AS job_version, ri.input_versions, ro.output_versions FROM runs_view AS r @@ -135,7 +134,6 @@ SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS fac GROUP BY rf.run_uuid ) AS f ON r.uuid=f.run_uuid LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid - LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid LEFT OUTER JOIN ( SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, @@ -192,7 +190,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, + ":namespaceName, " + ":jobName, " + ":location, " - + ":jobContextUuid " + + "null " + ") ON CONFLICT(uuid) DO " + "UPDATE SET " + "external_id = EXCLUDED.external_id, " @@ -217,8 +215,7 @@ RunRow upsert( Instant runStateTime, String namespaceName, String jobName, - String location, - UUID jobContextUuid); + String location); @SqlQuery( "INSERT INTO runs ( " @@ -250,7 +247,7 @@ RunRow upsert( + ":namespaceName, " + ":jobName, " + ":location, " - + ":jobContextUuid " + + "null" + ") ON CONFLICT(uuid) DO " + "UPDATE SET " + "external_id = EXCLUDED.external_id, " @@ -272,8 +269,7 @@ RunRow upsert( UUID namespaceUuid, String namespaceName, String jobName, - String location, - UUID jobContextUuid); + String location); @SqlUpdate( "INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) " @@ -402,8 +398,7 @@ default RunRow upsertRunMeta( now, namespaceRow.getName(), jobRow.getName(), - jobRow.getLocation(), - jobRow.getJobContextUuid().orElse(null)); + jobRow.getLocation()); updateInputDatasetMapping(jobRow.getInputs(), uuid); diff --git a/api/src/main/java/marquez/db/RunFacetsDao.java b/api/src/main/java/marquez/db/RunFacetsDao.java index 3a8a6d8be2..0af844b156 100644 --- a/api/src/main/java/marquez/db/RunFacetsDao.java +++ b/api/src/main/java/marquez/db/RunFacetsDao.java @@ -13,7 +13,10 @@ import java.util.stream.StreamSupport; import lombok.NonNull; import marquez.common.Utils; +import marquez.db.mappers.RunFacetsMapper; import marquez.service.models.LineageEvent; +import marquez.service.models.RunFacets; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; import org.jdbi.v3.sqlobject.transaction.Transaction; @@ -21,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RegisterRowMapper(RunFacetsMapper.class) /** The DAO for {@code run} facets. */ public interface RunFacetsDao { Logger log = LoggerFactory.getLogger(RunFacetsDao.class); @@ -64,6 +68,23 @@ void insertRunFacet( @SqlQuery("SELECT EXISTS (SELECT 1 FROM run_facets WHERE name = :name AND run_uuid = :runUuid)") boolean runFacetExists(String name, UUID runUuid); + /** + * @param runUuid + */ + @SqlQuery( + """ + SELECT + run_uuid, + JSON_AGG(facet ORDER BY lineage_event_time) AS facets + FROM + run_facets_view + WHERE + run_uuid = :runUuid + GROUP BY + run_uuid + """) + RunFacets findRunFacetsByRunUuid(UUID runUuid); + /** * @param runUuid * @param lineageEventTime diff --git a/api/src/main/java/marquez/db/mappers/ExtendedJobVersionRowMapper.java b/api/src/main/java/marquez/db/mappers/ExtendedJobVersionRowMapper.java index 3323dd363b..e665ac9389 100644 --- a/api/src/main/java/marquez/db/mappers/ExtendedJobVersionRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/ExtendedJobVersionRowMapper.java @@ -34,7 +34,6 @@ public ExtendedJobVersionRow map(@NonNull ResultSet results, @NonNull StatementC timestampOrThrow(results, Columns.CREATED_AT), timestampOrThrow(results, Columns.UPDATED_AT), uuidOrThrow(results, Columns.JOB_UUID), - uuidOrThrow(results, Columns.JOB_CONTEXT_UUID), columnNames.contains(Columns.INPUT_UUIDS) ? uuidArrayOrThrow(results, Columns.INPUT_UUIDS) : ImmutableList.of(), @@ -44,7 +43,6 @@ public ExtendedJobVersionRow map(@NonNull ResultSet results, @NonNull StatementC stringOrNull(results, Columns.LOCATION), uuidOrThrow(results, Columns.VERSION), uuidOrNull(results, Columns.LATEST_RUN_UUID), - columnNames.contains(Columns.CONTEXT) ? stringOrThrow(results, Columns.CONTEXT) : "", stringOrThrow(results, Columns.NAMESPACE_NAME), stringOrThrow(results, Columns.JOB_NAME), uuidOrThrow(results, Columns.NAMESPACE_UUID)); diff --git a/api/src/main/java/marquez/db/mappers/JobContextRowMapper.java b/api/src/main/java/marquez/db/mappers/JobContextRowMapper.java deleted file mode 100644 index a53031d946..0000000000 --- a/api/src/main/java/marquez/db/mappers/JobContextRowMapper.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2018-2023 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.db.mappers; - -import static marquez.db.Columns.stringOrThrow; -import static marquez.db.Columns.timestampOrThrow; -import static marquez.db.Columns.uuidOrThrow; - -import java.sql.ResultSet; -import java.sql.SQLException; -import lombok.NonNull; -import marquez.db.Columns; -import marquez.db.models.JobContextRow; -import org.jdbi.v3.core.mapper.RowMapper; -import org.jdbi.v3.core.statement.StatementContext; - -public final class JobContextRowMapper implements RowMapper { - @Override - public JobContextRow map(@NonNull ResultSet results, @NonNull StatementContext context) - throws SQLException { - return new JobContextRow( - uuidOrThrow(results, Columns.ROW_UUID), - timestampOrThrow(results, Columns.CREATED_AT), - stringOrThrow(results, Columns.CONTEXT), - stringOrThrow(results, Columns.CHECKSUM)); - } -} diff --git a/api/src/main/java/marquez/db/mappers/JobDataMapper.java b/api/src/main/java/marquez/db/mappers/JobDataMapper.java index 2704abe2a6..1dc9c5b563 100644 --- a/api/src/main/java/marquez/db/mappers/JobDataMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobDataMapper.java @@ -12,13 +12,10 @@ import static marquez.db.Columns.uuidArrayOrEmpty; import static marquez.db.Columns.uuidOrThrow; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.sql.ResultSet; import java.sql.SQLException; import lombok.NonNull; -import marquez.common.Utils; import marquez.common.models.DatasetId; import marquez.common.models.JobId; import marquez.common.models.JobName; @@ -50,17 +47,7 @@ public JobData map(@NonNull ResultSet results, @NonNull StatementContext context ImmutableSet.of(), ImmutableSet.copyOf(uuidArrayOrEmpty(results, Columns.OUTPUT_UUIDS)), urlOrNull(results, "current_location"), - toContext(results, Columns.CONTEXT), stringOrNull(results, Columns.DESCRIPTION), null); } - - public static ImmutableMap toContext(ResultSet results, String column) - throws SQLException { - if (results.getString(column) == null) { - return null; - } - return Utils.fromJson( - results.getString(column), new TypeReference>() {}); - } } diff --git a/api/src/main/java/marquez/db/mappers/JobFacetsMapper.java b/api/src/main/java/marquez/db/mappers/JobFacetsMapper.java new file mode 100644 index 0000000000..b84a0ab6f8 --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/JobFacetsMapper.java @@ -0,0 +1,25 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.uuidOrNull; +import static marquez.db.mappers.MapperUtils.toFacetsOrNull; + +import java.sql.ResultSet; +import java.sql.SQLException; +import lombok.NonNull; +import marquez.service.models.JobFacets; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +public class JobFacetsMapper implements RowMapper { + + @Override + public JobFacets map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + return new JobFacets(uuidOrNull(results, "run_uuid"), toFacetsOrNull(results, "facets")); + } +} diff --git a/api/src/main/java/marquez/db/mappers/JobMapper.java b/api/src/main/java/marquez/db/mappers/JobMapper.java index 3d2b7568b4..c331dc413b 100644 --- a/api/src/main/java/marquez/db/mappers/JobMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobMapper.java @@ -15,7 +15,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashSet; @@ -54,7 +53,6 @@ public Job map(@NonNull ResultSet results, @NonNull StatementContext context) getDatasetFromJsonOrNull(results, "current_inputs"), new HashSet<>(), urlOrNull(results, "current_location"), - toContext(results, Columns.CONTEXT), stringOrNull(results, Columns.DESCRIPTION), // Latest Run is resolved in the JobDao. This can be brought in via a join and // and a jsonb but custom deserializers will need to be introduced @@ -63,14 +61,6 @@ public Job map(@NonNull ResultSet results, @NonNull StatementContext context) uuidOrNull(results, Columns.CURRENT_VERSION_UUID)); } - public static ImmutableMap toContext(ResultSet results, String column) - throws SQLException { - if (results.getString(column) == null) { - return null; - } - return Utils.fromJson(results.getString(column), new TypeReference<>() {}); - } - Set getDatasetFromJsonOrNull(@NonNull ResultSet results, String column) throws SQLException { if (results.getObject(column) == null) { diff --git a/api/src/main/java/marquez/db/mappers/JobRowMapper.java b/api/src/main/java/marquez/db/mappers/JobRowMapper.java index 1e097c2fcf..0ac62d9cc5 100644 --- a/api/src/main/java/marquez/db/mappers/JobRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobRowMapper.java @@ -46,7 +46,6 @@ public JobRow map(@NonNull ResultSet results, @NonNull StatementContext context) stringOrNull(results, Columns.PARENT_JOB_NAME), stringOrNull(results, Columns.DESCRIPTION), uuidOrNull(results, Columns.CURRENT_VERSION_UUID), - uuidOrNull(results, "current_job_context_uuid"), stringOrNull(results, "current_location"), getDatasetFromJsonOrNull(results, "current_inputs"), uuidOrNull(results, Columns.SYMLINK_TARGET_UUID)); diff --git a/api/src/main/java/marquez/db/mappers/JobVersionMapper.java b/api/src/main/java/marquez/db/mappers/JobVersionMapper.java index 42bd45bda6..d3bc4cbda9 100644 --- a/api/src/main/java/marquez/db/mappers/JobVersionMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobVersionMapper.java @@ -5,7 +5,6 @@ package marquez.db.mappers; -import static marquez.db.Columns.mapOrNull; import static marquez.db.Columns.stringOrThrow; import static marquez.db.Columns.timestampOrThrow; import static marquez.db.Columns.urlOrNull; @@ -35,7 +34,7 @@ * Convert a database row to a {@link JobVersion}. For the {@link JobVersion#latestRun}, we delegate * to the {@link RunMapper} with a specified prefix of {@value #RUN_COLUMN_PREFIX}, meaning all * run-related columns should be prefixed in the SQL query. This avoids conflicts between common - * column names, such as created_at, uuid, and context. + * column names, such as created_at and uuid. */ @Slf4j public class JobVersionMapper implements RowMapper { @@ -60,7 +59,6 @@ public JobVersion map(@NonNull ResultSet results, @NonNull StatementContext cont timestampOrThrow(results, Columns.CREATED_AT), Version.of(uuidOrThrow(results, Columns.VERSION)), urlOrNull(results, Columns.LOCATION), - mapOrNull(results, Columns.CONTEXT), toDatasetIdsList(results, Columns.INPUT_DATASETS), toDatasetIdsList(results, Columns.OUTPUT_DATASETS), latestRun); diff --git a/api/src/main/java/marquez/db/mappers/RunFacetsMapper.java b/api/src/main/java/marquez/db/mappers/RunFacetsMapper.java new file mode 100644 index 0000000000..59a241f6ea --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/RunFacetsMapper.java @@ -0,0 +1,24 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.uuidOrNull; +import static marquez.db.mappers.MapperUtils.toFacetsOrNull; + +import java.sql.ResultSet; +import java.sql.SQLException; +import lombok.NonNull; +import marquez.service.models.RunFacets; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +public class RunFacetsMapper implements RowMapper { + @Override + public RunFacets map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + return new RunFacets(uuidOrNull(results, "run_uuid"), toFacetsOrNull(results, "facets")); + } +} diff --git a/api/src/main/java/marquez/db/mappers/RunMapper.java b/api/src/main/java/marquez/db/mappers/RunMapper.java index e44e0dba39..0dad020ade 100644 --- a/api/src/main/java/marquez/db/mappers/RunMapper.java +++ b/api/src/main/java/marquez/db/mappers/RunMapper.java @@ -83,9 +83,6 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) ? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) : ImmutableList.of(), - columnNames.contains(columnPrefix + Columns.CONTEXT) - ? JobMapper.toContext(results, columnPrefix + Columns.CONTEXT) - : null, toFacetsOrNull(results, columnPrefix + Columns.FACETS)); } diff --git a/api/src/main/java/marquez/db/models/ExtendedJobVersionRow.java b/api/src/main/java/marquez/db/models/ExtendedJobVersionRow.java index d66a531c2b..9b0309056a 100644 --- a/api/src/main/java/marquez/db/models/ExtendedJobVersionRow.java +++ b/api/src/main/java/marquez/db/models/ExtendedJobVersionRow.java @@ -16,7 +16,6 @@ @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) public class ExtendedJobVersionRow extends JobVersionRow { - @Getter private final String context; @Getter private @NonNull String namespaceName; @Getter private @NonNull String name; @@ -25,13 +24,11 @@ public ExtendedJobVersionRow( final Instant createdAt, final Instant updatedAt, final UUID jobUuid, - final UUID jobContextUuid, final List inputUuids, final List outputUuids, final String location, final UUID version, final UUID latestRunUuid, - @NonNull final String context, @NonNull final String namespaceName, @NonNull final String name, @NonNull final UUID namespaceUuid) { @@ -41,7 +38,6 @@ public ExtendedJobVersionRow( updatedAt, jobUuid, name, - jobContextUuid, inputUuids, outputUuids, location, @@ -49,7 +45,6 @@ public ExtendedJobVersionRow( latestRunUuid, namespaceUuid, namespaceName); - this.context = context; this.namespaceName = namespaceName; this.name = name; } diff --git a/api/src/main/java/marquez/db/models/JobContextRow.java b/api/src/main/java/marquez/db/models/JobContextRow.java deleted file mode 100644 index e7c6b68daa..0000000000 --- a/api/src/main/java/marquez/db/models/JobContextRow.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2018-2023 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.db.models; - -import java.time.Instant; -import java.util.UUID; -import lombok.NonNull; -import lombok.Value; - -@Value -public class JobContextRow { - @NonNull UUID uuid; - @NonNull Instant createdAt; - @NonNull String context; - @NonNull String checksum; -} diff --git a/api/src/main/java/marquez/db/models/JobRow.java b/api/src/main/java/marquez/db/models/JobRow.java index e7ea7feaff..a71b60fdd8 100644 --- a/api/src/main/java/marquez/db/models/JobRow.java +++ b/api/src/main/java/marquez/db/models/JobRow.java @@ -26,7 +26,6 @@ public class JobRow { @Nullable String parentJobName; @Nullable String description; @Nullable UUID currentVersionUuid; - @Nullable UUID jobContextUuid; @Nullable String location; @Nullable Set inputs; @Nullable UUID symlinkTargetId; @@ -38,8 +37,4 @@ public Optional getDescription() { public Optional getCurrentVersionUuid() { return Optional.ofNullable(currentVersionUuid); } - - public Optional getJobContextUuid() { - return Optional.ofNullable(jobContextUuid); - } } diff --git a/api/src/main/java/marquez/db/models/JobVersionRow.java b/api/src/main/java/marquez/db/models/JobVersionRow.java index bd55b7adea..ca84dcd16d 100644 --- a/api/src/main/java/marquez/db/models/JobVersionRow.java +++ b/api/src/main/java/marquez/db/models/JobVersionRow.java @@ -25,7 +25,6 @@ public class JobVersionRow { @Getter @NonNull private final Instant updateAt; @Getter @NonNull private final UUID jobUuid; @Getter @NonNull private final String jobName; - @Getter @NonNull private final UUID jobContextUuid; @Getter @NonNull private final List inputUuids; @Getter @NonNull private final List outputUuids; @Nullable private final String location; diff --git a/api/src/main/java/marquez/db/models/UpdateLineageRow.java b/api/src/main/java/marquez/db/models/UpdateLineageRow.java index 996178ea5b..620c75a4b2 100644 --- a/api/src/main/java/marquez/db/models/UpdateLineageRow.java +++ b/api/src/main/java/marquez/db/models/UpdateLineageRow.java @@ -17,7 +17,6 @@ public class UpdateLineageRow { private NamespaceRow namespace; private JobRow job; - private JobContextRow jobContext; private RunArgsRow runArgs; private RunRow run; private RunStateRow runState; diff --git a/api/src/main/java/marquez/graphql/GraphqlDaos.java b/api/src/main/java/marquez/graphql/GraphqlDaos.java index 679baeddf2..ad979abc74 100644 --- a/api/src/main/java/marquez/graphql/GraphqlDaos.java +++ b/api/src/main/java/marquez/graphql/GraphqlDaos.java @@ -105,9 +105,6 @@ List> getDistinctJobVersionsByDatasetVersionOutput( @SqlQuery("SELECT * from owners where name = :ownerName") RowMap getCurrentOwnerByNamespace(String ownerName); - @SqlQuery("SELECT * from job_contexts where uuid = :uuid") - RowMap getJobContext(UUID uuid); - @SqlQuery("SELECT * from datasets_view where namespace_uuid = :namespaceUuid") List> getDatasetsByNamespace(UUID namespaceUuid); @@ -117,7 +114,7 @@ List> getDistinctJobVersionsByDatasetVersionOutput( @SqlQuery( "SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, jv.location, " - + " jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, j.namespace_name, " + + " jv.latest_run_uuid, j.namespace_uuid, j.namespace_name, " + " j.name AS job_name " + " FROM job_versions_io_mapping m " + " inner join job_versions jv " @@ -128,7 +125,7 @@ List> getDistinctJobVersionsByDatasetVersionOutput( @SqlQuery( "SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, jv.location, " - + " jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, j.namespace_name, " + + " jv.latest_run_uuid, j.namespace_uuid, j.namespace_name, " + " j.name AS job_name " + " from job_versions jv " + " inner join jobs_view j ON j.uuid=jv.job_uuid " @@ -137,7 +134,7 @@ List> getDistinctJobVersionsByDatasetVersionOutput( @SqlQuery( "SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, jv.location, " - + " jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, j.namespace_name, " + + " jv.latest_run_uuid, j.namespace_uuid, j.namespace_name, " + " j.name AS job_name " + " from job_versions jv " + " inner join jobs_view j ON j.uuid=jv.job_uuid " diff --git a/api/src/main/java/marquez/graphql/GraphqlDataFetchers.java b/api/src/main/java/marquez/graphql/GraphqlDataFetchers.java index 0ad1e8defe..cad3dfab7d 100644 --- a/api/src/main/java/marquez/graphql/GraphqlDataFetchers.java +++ b/api/src/main/java/marquez/graphql/GraphqlDataFetchers.java @@ -231,19 +231,6 @@ public DataFetcher getCurrentOwnerByNamespace() { }; } - public DataFetcher getJobContextByJobVersion() { - return dataFetchingEnvironment -> { - Map map = dataFetchingEnvironment.getSource(); - - Map jobContext = dao.getJobContext((UUID) map.get("jobContextUuid")); - if (jobContext == null) { - return null; - } - return Utils.fromJson( - (String) jobContext.get("context"), new TypeReference>() {}); - }; - } - public DataFetcher getLatestRunByJobVersion() { return dataFetchingEnvironment -> { Map map = dataFetchingEnvironment.getSource(); diff --git a/api/src/main/java/marquez/graphql/GraphqlSchemaBuilder.java b/api/src/main/java/marquez/graphql/GraphqlSchemaBuilder.java index f7aaad54ac..56f43be597 100644 --- a/api/src/main/java/marquez/graphql/GraphqlSchemaBuilder.java +++ b/api/src/main/java/marquez/graphql/GraphqlSchemaBuilder.java @@ -101,7 +101,6 @@ public void buildRuntimeWiring(Jdbi jdbi, Builder wiring) { .dataFetcher("datasets", dataFetchers.getDatasetsByNamespace())) .type( newTypeWiring("JobVersion") - .dataFetcher("jobContext", dataFetchers.getJobContextByJobVersion()) .dataFetcher("latestRun", dataFetchers.getLatestRunByJobVersion()) .dataFetcher("job", dataFetchers.getJobByJobVersion()) .dataFetcher("inputs", dataFetchers.getInputsByJobVersion()) diff --git a/api/src/main/java/marquez/service/DelegatingDaos.java b/api/src/main/java/marquez/service/DelegatingDaos.java index eea955f01f..13011e4ffc 100644 --- a/api/src/main/java/marquez/service/DelegatingDaos.java +++ b/api/src/main/java/marquez/service/DelegatingDaos.java @@ -11,7 +11,6 @@ import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; -import marquez.db.JobContextDao; import marquez.db.JobDao; import marquez.db.JobVersionDao; import marquez.db.LineageDao; @@ -40,11 +39,6 @@ public static class DelegatingDatasetVersionDao implements DatasetVersionDao { @Delegate private final DatasetVersionDao delegate; } - @RequiredArgsConstructor - public static class DelegatingJobContextDao implements JobContextDao { - @Delegate private final JobContextDao delegate; - } - @RequiredArgsConstructor public static class DelegatingJobDao implements JobDao { @Delegate private final JobDao delegate; diff --git a/api/src/main/java/marquez/service/models/Job.java b/api/src/main/java/marquez/service/models/Job.java index a7d2afe44e..4ed2d3fbd4 100644 --- a/api/src/main/java/marquez/service/models/Job.java +++ b/api/src/main/java/marquez/service/models/Job.java @@ -37,7 +37,6 @@ public final class Job { @Getter @Setter private Set inputs; @Getter @Setter private Set outputs; @Nullable private final URL location; - @Getter private final ImmutableMap context; @Nullable private final String description; @Nullable @Setter private Run latestRun; @Getter private final ImmutableMap facets; @@ -54,7 +53,6 @@ public Job( @NonNull final Set inputs, @NonNull final Set outputs, @Nullable final URL location, - @Nullable final ImmutableMap context, @Nullable final String description, @Nullable final Run latestRun, @Nullable final ImmutableMap facets, @@ -70,7 +68,6 @@ public Job( this.inputs = inputs; this.outputs = outputs; this.location = location; - this.context = (context == null) ? ImmutableMap.of() : context; this.description = description; this.latestRun = latestRun; this.facets = (facets == null) ? ImmutableMap.of() : facets; diff --git a/api/src/main/java/marquez/service/models/JobData.java b/api/src/main/java/marquez/service/models/JobData.java index 8b12e6fd27..e15e9cb008 100644 --- a/api/src/main/java/marquez/service/models/JobData.java +++ b/api/src/main/java/marquez/service/models/JobData.java @@ -6,7 +6,6 @@ package marquez.service.models; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.net.URL; import java.time.Instant; @@ -43,7 +42,6 @@ public class JobData implements NodeData { @Setter ImmutableSet outputs = ImmutableSet.of(); @Setter ImmutableSet outputUuids = ImmutableSet.of(); @Nullable URL location; - @NonNull ImmutableMap context; @Nullable String description; @Nullable @Setter Run latestRun; diff --git a/api/src/main/java/marquez/service/models/JobFacets.java b/api/src/main/java/marquez/service/models/JobFacets.java new file mode 100644 index 0000000000..71bbae7366 --- /dev/null +++ b/api/src/main/java/marquez/service/models/JobFacets.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import com.google.common.collect.ImmutableMap; +import java.util.UUID; +import javax.annotation.Nullable; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +@EqualsAndHashCode +@ToString +public class JobFacets { + @Getter private UUID runId; + @Getter private ImmutableMap facets; + + public JobFacets(@NonNull final UUID runId, @Nullable final ImmutableMap facets) { + this.runId = runId; + this.facets = (facets == null) ? ImmutableMap.of() : facets; + } +} diff --git a/api/src/main/java/marquez/service/models/JobMeta.java b/api/src/main/java/marquez/service/models/JobMeta.java index 1cac7c248a..115fd5ac1d 100644 --- a/api/src/main/java/marquez/service/models/JobMeta.java +++ b/api/src/main/java/marquez/service/models/JobMeta.java @@ -5,7 +5,6 @@ package marquez.service.models; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.net.URL; import java.util.Optional; @@ -25,7 +24,6 @@ public final class JobMeta { @Getter private final ImmutableSet inputs; @Getter private final ImmutableSet outputs; @Nullable private final URL location; - @Getter private final ImmutableMap context; @Nullable private final String description; @Nullable private final RunId runId; @@ -34,14 +32,12 @@ public JobMeta( @NonNull final ImmutableSet inputs, @NonNull final ImmutableSet outputs, @Nullable final URL location, - @Nullable final ImmutableMap context, @Nullable final String description, @Nullable final RunId runId) { this.type = type; this.inputs = inputs; this.outputs = outputs; this.location = location; - this.context = (context == null) ? ImmutableMap.of() : context; this.description = description; this.runId = runId; } diff --git a/api/src/main/java/marquez/service/models/Run.java b/api/src/main/java/marquez/service/models/Run.java index 20af5cce48..99231772f9 100644 --- a/api/src/main/java/marquez/service/models/Run.java +++ b/api/src/main/java/marquez/service/models/Run.java @@ -60,7 +60,6 @@ public final class Run { private final String location; @Getter private final List inputVersions; @Getter private final List outputVersions; - @Getter private final Map context; @Getter private final ImmutableMap facets; public Run( @@ -80,7 +79,6 @@ public Run( String location, List inputVersions, List outputVersions, - Map context, @Nullable final ImmutableMap facets) { this.id = id; this.createdAt = createdAt; @@ -98,7 +96,6 @@ public Run( this.location = location; this.inputVersions = inputVersions; this.outputVersions = outputVersions; - this.context = context; this.facets = (facets == null) ? ImmutableMap.of() : facets; } @@ -166,7 +163,6 @@ public static class Builder { private String location; private List inputVersions; private List outputVersions; - private Map context; @JsonInclude(JsonInclude.Include.NON_NULL) private ImmutableMap facets; @@ -189,7 +185,6 @@ public Run build() { location, inputVersions, outputVersions, - context, facets); } } diff --git a/api/src/main/java/marquez/service/models/RunFacets.java b/api/src/main/java/marquez/service/models/RunFacets.java new file mode 100644 index 0000000000..6a88813a10 --- /dev/null +++ b/api/src/main/java/marquez/service/models/RunFacets.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import com.google.common.collect.ImmutableMap; +import java.util.UUID; +import javax.annotation.Nullable; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +@EqualsAndHashCode +@ToString +public class RunFacets { + @Getter private UUID runId; + @Getter private ImmutableMap facets; + + public RunFacets(@NonNull final UUID runId, @Nullable final ImmutableMap facets) { + this.runId = runId; + this.facets = (facets == null) ? ImmutableMap.of() : facets; + } +} diff --git a/api/src/main/resources/marquez/db/migration/V60__alter_job_versions_to_drop_job_context_uuid.sql b/api/src/main/resources/marquez/db/migration/V60__alter_job_versions_to_drop_job_context_uuid.sql new file mode 100644 index 0000000000..1ae8cf40b3 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V60__alter_job_versions_to_drop_job_context_uuid.sql @@ -0,0 +1,3 @@ +/* SPDX-License-Identifier: Apache-2.0 */ + +ALTER TABLE job_versions ALTER COLUMN job_context_uuid DROP NOT NULL; diff --git a/api/src/main/resources/schema.graphqls b/api/src/main/resources/schema.graphqls index 11a421cafd..781b58fb73 100644 --- a/api/src/main/resources/schema.graphqls +++ b/api/src/main/resources/schema.graphqls @@ -99,7 +99,6 @@ type JobVersion { updatedAt: DateTime location: String version: UUID - jobContext: Json latestRun: Run job: Job inputs: [Dataset] diff --git a/api/src/test/java/marquez/BaseIntegrationTest.java b/api/src/test/java/marquez/BaseIntegrationTest.java index a29ce266f3..30eb8bfa4c 100644 --- a/api/src/test/java/marquez/BaseIntegrationTest.java +++ b/api/src/test/java/marquez/BaseIntegrationTest.java @@ -8,7 +8,6 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static marquez.common.models.CommonModelGenerator.newConnectionUrl; import static marquez.common.models.CommonModelGenerator.newConnectionUrlFor; -import static marquez.common.models.CommonModelGenerator.newContext; import static marquez.common.models.CommonModelGenerator.newDatasetName; import static marquez.common.models.CommonModelGenerator.newDbSourceType; import static marquez.common.models.CommonModelGenerator.newDescription; @@ -22,7 +21,6 @@ import static marquez.common.models.CommonModelGenerator.newSourceName; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.dropwizard.testing.ConfigOverride; import io.dropwizard.testing.ResourceHelpers; @@ -108,7 +106,6 @@ public abstract class BaseIntegrationTest { protected static JobId JOB_ID; protected static JobType JOB_TYPE; protected static URL JOB_LOCATION; - protected static ImmutableMap JOB_CONTEXT; protected static String JOB_DESCRIPTION; protected static JobMeta JOB_META; @@ -169,7 +166,6 @@ protected static void setupAll() throws Exception { JOB_ID = new JobId(NAMESPACE_NAME, JOB_NAME); JOB_TYPE = JobType.BATCH; JOB_LOCATION = newLocation(); - JOB_CONTEXT = newContext(); JOB_DESCRIPTION = newDescription(); JOB_META = JobMeta.builder() @@ -177,7 +173,6 @@ protected static void setupAll() throws Exception { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); diff --git a/api/src/test/java/marquez/DatasetIntegrationTest.java b/api/src/test/java/marquez/DatasetIntegrationTest.java index e4cedf215d..86e0af5591 100644 --- a/api/src/test/java/marquez/DatasetIntegrationTest.java +++ b/api/src/test/java/marquez/DatasetIntegrationTest.java @@ -249,7 +249,6 @@ public void testApp_getDBTableVersionWithRun() { .inputs(ImmutableSet.of()) .outputs(NAMESPACE_NAME, "table1") .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); diff --git a/api/src/test/java/marquez/FlowIntegrationTest.java b/api/src/test/java/marquez/FlowIntegrationTest.java index 037b5058c7..b4e7ff631d 100644 --- a/api/src/test/java/marquez/FlowIntegrationTest.java +++ b/api/src/test/java/marquez/FlowIntegrationTest.java @@ -181,7 +181,6 @@ private Job createJob(String runId) { .inputs(NAMESPACE_NAME, DATASET_NAME) .outputs(NAMESPACE_NAME, DATASET_NAME) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .runId(runId) .build(); diff --git a/api/src/test/java/marquez/MarquezAppIntegrationTest.java b/api/src/test/java/marquez/MarquezAppIntegrationTest.java index 44921500f8..7d2245b375 100644 --- a/api/src/test/java/marquez/MarquezAppIntegrationTest.java +++ b/api/src/test/java/marquez/MarquezAppIntegrationTest.java @@ -15,7 +15,6 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -388,7 +387,6 @@ public void testApp_createJobAndMarkRunAsComplete() { .inputs(inputs) .outputs(outputs) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); @@ -401,7 +399,6 @@ public void testApp_createJobAndMarkRunAsComplete() { assertThat(job.getInputs()).isEqualTo(inputs); assertThat(job.getOutputs()).isEmpty(); assertThat(job.getLocation()).isEqualTo(Optional.of(JOB_LOCATION)); - assertThat(job.getContext()).isEqualTo(JOB_CONTEXT); assertThat(job.getDescription()).isEqualTo(Optional.of(JOB_DESCRIPTION)); assertThat(job.getLatestRun()).isEmpty(); @@ -435,18 +432,12 @@ public void testApp_createJobAndMarkRunAsComplete() { // (8) Modify context in job metadata to create new job version; the version // will be linked to the provided run ID - final ImmutableMap modifiedJobContext = - new ImmutableMap.Builder() - .putAll(JOB_CONTEXT) - .put("key0", "value0") - .build(); final JobMeta jobMetaWithRunId = JobMeta.builder() .type(JOB_TYPE) .inputs(inputs) .outputs(outputs) .location(JOB_LOCATION) - .context(modifiedJobContext) .description(JOB_DESCRIPTION) .runId(runStarted.getId()) .build(); @@ -459,7 +450,6 @@ public void testApp_createJobAndMarkRunAsComplete() { assertThat(jobWithNewVersion.getInputs()).isEqualTo(inputs); assertThat(jobWithNewVersion.getOutputs()).isEqualTo(outputs); assertThat(jobWithNewVersion.getLocation()).isEqualTo(Optional.of(JOB_LOCATION)); - assertThat(jobWithNewVersion.getContext()).isEqualTo(modifiedJobContext); assertThat(jobWithNewVersion.getDescription()).isEqualTo(Optional.of(JOB_DESCRIPTION)); assertThat(jobWithNewVersion.getLatestRun().get().getId()).isEqualTo(runStarted.getId()); @@ -508,7 +498,6 @@ public void testApp_testLazyInputDataset() { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); @@ -562,7 +551,6 @@ public void testApp_testLazyInputDataset() { .outputs(outputs) .runId(run.getId()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); client.createJob(NAMESPACE_NAME, jobName, jobUpdateMeta); @@ -594,7 +582,6 @@ public void testApp_listRunOrder() { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); final Job job = client.createJob(NAMESPACE_NAME, jobName, jobMeta); @@ -657,7 +644,6 @@ public void testApp_getJob() throws SQLException { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); final Job originalJob = client.createJob(NAMESPACE_NAME, jobName, jobMeta); @@ -669,7 +655,6 @@ public void testApp_getJob() throws SQLException { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); final Job targetJob = client.createJob(NAMESPACE_NAME, targetJobName, targetJobMeta); @@ -692,7 +677,6 @@ public void testApp_getJob() throws SQLException { NAMESPACE_NAME, jobName, JOB_DESCRIPTION, - j.getJobContextUuid().orElse(null), JOB_LOCATION.toString(), targetJobRow.get().getUuid(), inputs); @@ -721,7 +705,6 @@ public void testApp_getJobWithFQNFromParent() throws SQLException { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); final Job originalJob = client.createJob(NAMESPACE_NAME, jobName, jobMeta); @@ -733,7 +716,6 @@ public void testApp_getJobWithFQNFromParent() throws SQLException { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); final Job parentJob = client.createJob(NAMESPACE_NAME, parentJobName, parentJobMeta); @@ -766,7 +748,6 @@ public void testApp_getJobWithFQNFromParent() throws SQLException { namespaceRow.get().getName(), jobRow.getName(), jobRow.getDescription().orElse(null), - jobRow.getJobContextUuid().orElse(null), jobRow.getLocation(), null, inputs); @@ -779,7 +760,6 @@ public void testApp_getJobWithFQNFromParent() throws SQLException { NAMESPACE_NAME, jobName, JOB_DESCRIPTION, - jobRow.getJobContextUuid().orElse(null), JOB_LOCATION.toString(), targetJobRow.getUuid(), inputs); diff --git a/api/src/test/java/marquez/RunIntegrationTest.java b/api/src/test/java/marquez/RunIntegrationTest.java index f1ed560666..65117d9dae 100644 --- a/api/src/test/java/marquez/RunIntegrationTest.java +++ b/api/src/test/java/marquez/RunIntegrationTest.java @@ -69,7 +69,6 @@ public void testApp_updateOutputDataset() { .inputs(ImmutableSet.of()) .outputs(NAMESPACE_NAME, "my-output-ds") .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); client.createJob(NAMESPACE_NAME, JOB_NAME, JOB_META); @@ -110,7 +109,6 @@ public void testSerialization() throws JsonProcessingException { "location", ImmutableList.of(), ImmutableList.of(), - ImmutableMap.of(), ImmutableMap.of()); ObjectMapper objectMapper = Utils.newObjectMapper(); String json = objectMapper.writeValueAsString(run); diff --git a/api/src/test/java/marquez/api/JobResourceTest.java b/api/src/test/java/marquez/api/JobResourceTest.java new file mode 100644 index 0000000000..856b34037e --- /dev/null +++ b/api/src/test/java/marquez/api/JobResourceTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.api; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableMap; +import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; +import io.dropwizard.testing.junit5.ResourceExtension; +import java.util.Map; +import java.util.UUID; +import marquez.common.Utils; +import marquez.db.JobFacetsDao; +import marquez.db.JobVersionDao; +import marquez.db.RunFacetsDao; +import marquez.service.RunService; +import marquez.service.ServiceFactory; +import marquez.service.models.JobFacets; +import marquez.service.models.RunFacets; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(DropwizardExtensionsSupport.class) +class JobResourceTest { + private static ResourceExtension UNDER_TEST; + private static JobFacets JOB_FACETS; + private static RunFacets RUN_FACETS; + private static UUID RUN_UUID = UUID.fromString("a32f2800-7782-3ce3-b77e-eeeeaded3cf3"); + + static { + RunService runService = mock(RunService.class); + JobVersionDao jobVersionDao = mock(JobVersionDao.class); + JobFacetsDao jobFacetsDao = mock(JobFacetsDao.class); + RunFacetsDao runFacetsDao = mock(RunFacetsDao.class); + + ImmutableMap jobFacets = + Utils.fromJson( + JobResourceTest.class.getResourceAsStream("/facets/job_facets.json"), + new TypeReference<>() {}); + ImmutableMap runFacets = + Utils.fromJson( + JobResourceTest.class.getResourceAsStream("/facets/run_facets.json"), + new TypeReference<>() {}); + JOB_FACETS = new JobFacets(RUN_UUID, jobFacets); + RUN_FACETS = new RunFacets(RUN_UUID, runFacets); + when(runService.exists(RUN_UUID)).thenReturn(true); + when(jobFacetsDao.findJobFacetsByRunUuid(any(UUID.class))).thenReturn(JOB_FACETS); + when(runFacetsDao.findRunFacetsByRunUuid(any(UUID.class))).thenReturn(RUN_FACETS); + + ServiceFactory serviceFactory = + ApiTestUtils.mockServiceFactory(Map.of(RunService.class, runService)); + + UNDER_TEST = + ResourceExtension.builder() + .addResource(new JobResource(serviceFactory, jobVersionDao, jobFacetsDao, runFacetsDao)) + .build(); + } + + @Test + public void testGetJobFacetsByRunUUid() { + assertThat( + UNDER_TEST + .target("/api/v1/jobs/321/runs") + .queryParam("type", "job") + .request() + .get() + .getStatus()) + .isEqualTo(404); + final JobFacets testJobFacets = + UNDER_TEST + .target("/api/v1/jobs/runs/a32f2800-7782-3ce3-b77e-eeeeaded3cf3/facets") + .queryParam("type", "job") + .request() + .get() + .readEntity(JobFacets.class); + + assertEquals(testJobFacets, JOB_FACETS); + } + + @Test + public void testGetRunFacetsByRunUUid() { + assertThat( + UNDER_TEST + .target("/api/v1/jobs/321/runs") + .queryParam("type", "run") + .request() + .get() + .getStatus()) + .isEqualTo(404); + final RunFacets testRunFacets = + UNDER_TEST + .target("/api/v1/jobs/runs/a32f2800-7782-3ce3-b77e-eeeeaded3cf3/facets") + .queryParam("type", "run") + .request() + .get() + .readEntity(RunFacets.class); + + assertEquals(testRunFacets, RUN_FACETS); + } +} diff --git a/api/src/test/java/marquez/common/UtilsTest.java b/api/src/test/java/marquez/common/UtilsTest.java index 0481a06683..287c04b7bc 100644 --- a/api/src/test/java/marquez/common/UtilsTest.java +++ b/api/src/test/java/marquez/common/UtilsTest.java @@ -158,7 +158,6 @@ public void testNewJobVersionFor_equal() { jobName, jobMeta.getInputs(), jobMeta.getOutputs(), - jobMeta.getContext(), jobMeta.getLocation().map(URL::toString).orElse(null)); final Version version1 = Utils.newJobVersionFor( @@ -166,7 +165,6 @@ public void testNewJobVersionFor_equal() { jobName, jobMeta.getInputs(), jobMeta.getOutputs(), - jobMeta.getContext(), jobMeta.getLocation().map(URL::toString).orElse(null)); assertThat(version0).isEqualTo(version1); } @@ -184,7 +182,6 @@ public void testNewJobVersionFor_equalOnUnsortedInputsAndOutputs() { jobName, jobMeta.getInputs(), jobMeta.getOutputs(), - jobMeta.getContext(), jobMeta.getLocation().map(URL::toString).orElse(null)); // Unsort the job inputs and outputs for version1. final ImmutableSet unsortedJobInputIds = @@ -197,7 +194,6 @@ public void testNewJobVersionFor_equalOnUnsortedInputsAndOutputs() { jobName, unsortedJobInputIds, unsortedJobOutputIds, - jobMeta.getContext(), jobMeta.getLocation().map(URL::toString).orElse(null)); assertThat(version0).isEqualTo(version1); } @@ -216,7 +212,6 @@ public void testNewJobVersionFor_notEqual() { jobName, jobMeta0.getInputs(), jobMeta0.getOutputs(), - jobMeta0.getContext(), jobMeta0.getLocation().map(URL::toString).orElse(null)); final Version version1 = Utils.newJobVersionFor( @@ -224,7 +219,6 @@ public void testNewJobVersionFor_notEqual() { jobName, jobMeta1.getInputs(), jobMeta1.getOutputs(), - jobMeta1.getContext(), jobMeta1.getLocation().map(URL::toString).orElse(null)); assertThat(version0).isNotEqualTo(version1); } diff --git a/api/src/test/java/marquez/common/api/JobResourceIntegrationTest.java b/api/src/test/java/marquez/common/api/JobResourceIntegrationTest.java index 8da501e88f..e479c15e08 100644 --- a/api/src/test/java/marquez/common/api/JobResourceIntegrationTest.java +++ b/api/src/test/java/marquez/common/api/JobResourceIntegrationTest.java @@ -92,7 +92,6 @@ public void testApp_createNonMatchingJobWithRun() { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); client.createJob(NAMESPACE_NAME, JOB_NAME, JOB_META); @@ -106,7 +105,6 @@ public void testApp_createNonMatchingJobWithRun() { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .runId(runId) .build(); @@ -123,7 +121,6 @@ public void testApp_createJobWithMissingRun() { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .runId(UUID.randomUUID().toString()) .build(); @@ -139,7 +136,6 @@ public void testApp_createNotExistingDataset() { .inputs(NAMESPACE_NAME, "does-not-exist") .outputs(NAMESPACE_NAME, "does-not-exist") .location(JOB_LOCATION) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); Assertions.assertThrows( @@ -174,7 +170,6 @@ public void testApp_listJobVersions() { .inputs(ImmutableSet.of()) .outputs(ImmutableSet.of()) .location(CommonModelGenerator.newLocation()) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .runId(newRunId) .build(); @@ -253,7 +248,6 @@ public void testApp_getJobVersionWithInputsAndOutputs() { .inputs(ImmutableSet.of(STREAM_ID)) .outputs(ImmutableSet.of(DB_TABLE_ID)) .location(CommonModelGenerator.newLocation()) - .context(JOB_CONTEXT) .description(JOB_DESCRIPTION) .build(); client.createJob(NAMESPACE_NAME, jobName, newVersionMeta); diff --git a/api/src/test/java/marquez/common/models/CommonModelGenerator.java b/api/src/test/java/marquez/common/models/CommonModelGenerator.java index 7630194e6d..d6f00aaa09 100644 --- a/api/src/test/java/marquez/common/models/CommonModelGenerator.java +++ b/api/src/test/java/marquez/common/models/CommonModelGenerator.java @@ -9,7 +9,6 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.net.URI; import java.net.URL; @@ -151,11 +150,6 @@ public static URL newLocation() { return Utils.toUrl("https://github.com/repo/test/commit/" + newId()); } - public static ImmutableMap newContext() { - return ImmutableMap.of( - "sql", String.format("SELECT * FROM test_table WHERE test_column = '%dH';", newId())); - } - public static RunId newRunId() { return RunId.of(UUID.randomUUID()); } diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index 354bc08a22..92fddcee64 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -66,7 +66,6 @@ public static RunRow writeNewEvent( namespace.getUuid(), namespace.getName(), jobName, - null, null); NominalTimeRunFacet nominalTimeRunFacet = new NominalTimeRunFacet(); @@ -119,23 +118,10 @@ public static UUID writeJob(Jdbi jdbi, String jobName, Instant now, NamespaceRow pgInputs.setValue("[]"); return jdbi.withHandle( h -> { - UUID jobContextUuid = - h.createQuery( - """ -INSERT INTO job_contexts (uuid, created_at, context, checksum) VALUES (:uuid, :now, :context, :checksum) -ON CONFLICT (checksum) DO UPDATE SET created_at=EXCLUDED.created_at -RETURNING uuid -""") - .bind("uuid", UUID.randomUUID()) - .bind("now", now) - .bind("context", "") - .bind("checksum", "") - .mapTo(UUID.class) - .first(); return h.createQuery( """ INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, namespace_name, current_job_context_uuid, current_inputs) - VALUES (:uuid, :type, :now, :now, :namespaceUuid, :name, :namespaceName, :currentJobContextUuid, :currentInputs) + VALUES (:uuid, :type, :now, :now, :namespaceUuid, :name, :namespaceName, null, :currentInputs) RETURNING uuid """) .bind("uuid", UUID.randomUUID()) @@ -144,7 +130,6 @@ INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, name .bind("namespaceUuid", namespace.getUuid()) .bind("name", jobName) .bind("namespaceName", namespace.getName()) - .bind("currentJobContextUuid", jobContextUuid) .bind("currentInputs", pgInputs) .mapTo(UUID.class) .first(); diff --git a/api/src/test/java/marquez/db/DbTestUtils.java b/api/src/test/java/marquez/db/DbTestUtils.java index ca18c4cd2e..01ede91b58 100644 --- a/api/src/test/java/marquez/db/DbTestUtils.java +++ b/api/src/test/java/marquez/db/DbTestUtils.java @@ -7,7 +7,6 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static marquez.Generator.newTimestamp; -import static marquez.common.models.CommonModelGenerator.newContext; import static marquez.common.models.CommonModelGenerator.newDatasetName; import static marquez.common.models.CommonModelGenerator.newDescription; import static marquez.common.models.CommonModelGenerator.newExternalId; @@ -41,7 +40,6 @@ import marquez.common.models.RunState; import marquez.db.models.DatasetRow; import marquez.db.models.ExtendedJobVersionRow; -import marquez.db.models.JobContextRow; import marquez.db.models.JobRow; import marquez.db.models.JobVersionRow; import marquez.db.models.NamespaceRow; @@ -137,14 +135,7 @@ public static JobRow createJobWithoutSymlinkTarget( jdbi, namespace.getName(), jobName, - new JobMeta( - JobType.BATCH, - ImmutableSet.of(), - ImmutableSet.of(), - null, - ImmutableMap.of(), - description, - null)); + new JobMeta(JobType.BATCH, ImmutableSet.of(), ImmutableSet.of(), null, description, null)); } public static JobRow createJobWithSymlinkTarget( @@ -154,14 +145,7 @@ public static JobRow createJobWithSymlinkTarget( namespace.getName(), jobName, jobSymlinkId, - new JobMeta( - JobType.BATCH, - ImmutableSet.of(), - ImmutableSet.of(), - null, - ImmutableMap.of(), - description, - null)); + new JobMeta(JobType.BATCH, ImmutableSet.of(), ImmutableSet.of(), null, description, null)); } /** @@ -206,20 +190,10 @@ static JobRow newJobWith( Utils.getMapper()); } - /** Adds a new {@link JobContextRow} object to the {@code job_contexts} table. */ - static JobContextRow newJobContext(final Jdbi jdbi) { - final JobContextDao jobContextDao = jdbi.onDemand(JobContextDao.class); - final ImmutableMap context = newContext(); - final String contextAsJson = Utils.toJson(newContext()); - final String checksum = Utils.checksumFor(context); - return jobContextDao.upsert(newRowUuid(), newTimestamp(), contextAsJson, checksum); - } - /** Adds a new {@link JobVersionRow} object to the {@code job_versions} table. */ static ExtendedJobVersionRow newJobVersion( final Jdbi jdbi, final UUID jobUuid, - final UUID jobContextUuid, final UUID version, final String jobName, final UUID namespaceUuid, @@ -229,7 +203,6 @@ static ExtendedJobVersionRow newJobVersion( newRowUuid(), newTimestamp(), jobUuid, - jobContextUuid, newLocation().toString(), version, jobName, @@ -263,8 +236,7 @@ static RunRow newRun( final UUID namespaceUuid, final String namespaceName, final String jobName, - final String jobLocation, - final UUID jobContextUuid) { + final String jobLocation) { final RunDao runDao = jdbi.onDemand(RunDao.class); return runDao.upsert( newRowUuid(), @@ -279,8 +251,7 @@ static RunRow newRun( namespaceUuid, namespaceName, jobName, - jobLocation, - jobContextUuid); + jobLocation); } /** Transition a {@link Run} to the provided {@link RunState}. */ diff --git a/api/src/test/java/marquez/db/JobDaoTest.java b/api/src/test/java/marquez/db/JobDaoTest.java index dc76d76ef2..3284dc2b75 100644 --- a/api/src/test/java/marquez/db/JobDaoTest.java +++ b/api/src/test/java/marquez/db/JobDaoTest.java @@ -15,16 +15,13 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; import java.sql.SQLException; import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.UUID; -import marquez.common.Utils; import marquez.common.models.JobType; import marquez.db.models.DbModelGenerator; -import marquez.db.models.JobContextRow; import marquez.db.models.JobRow; import marquez.db.models.NamespaceRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; @@ -191,9 +188,6 @@ public void testSymlinkParentJobRenamesChildren() throws SQLException { JobRow parentJob = createJobWithoutSymlinkTarget(jdbi, namespace, parentJobName, "the original parent job"); Instant now = Instant.now(); - JobContextRow jobContext = - jdbi.onDemand(JobContextDao.class) - .upsert(UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of())); PGobject inputs = new PGobject(); inputs.setValue("[]"); inputs.setType("JSON"); @@ -208,7 +202,6 @@ public void testSymlinkParentJobRenamesChildren() throws SQLException { namespace.getName(), childJob1Name, null, - jobContext.getUuid(), null, null, inputs); @@ -224,7 +217,6 @@ public void testSymlinkParentJobRenamesChildren() throws SQLException { namespace.getName(), childJob2Name, null, - jobContext.getUuid(), null, null, inputs); diff --git a/api/src/test/java/marquez/db/JobFacetsDaoTest.java b/api/src/test/java/marquez/db/JobFacetsDaoTest.java index 12286d9852..e6b48e7d7d 100644 --- a/api/src/test/java/marquez/db/JobFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/JobFacetsDaoTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.postgresql.util.PGobject; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @ExtendWith(MarquezJdbiExternalPostgresExtension.class) public class JobFacetsDaoTest { @@ -84,6 +85,36 @@ public void insertJobFacets() { + "\"_schemaURL\": \"http://test.schema/\"}}"); } + @Test + public void testGetFacetsByRunUuid() { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet( + new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"), + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), + new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"), + null); + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Collections.emptyList(), + Collections.emptyList(), + new LineageEvent.ParentRunFacet( + PRODUCER_URL, + SCHEMA_URL, + new LineageEvent.RunLink(UUID.randomUUID().toString()), + new LineageEvent.JobLink("namespace", "name")), + ImmutableMap.of().of("custom-run-facet", "some-run-facet")); + + assertThat(jobFacetsDao.findJobFacetsByRunUuid(lineageRow.getRun().getUuid()).getFacets()) + .hasSize(3) + .extracting("sql.query") + .isEqualTo("some sql query"); + } + private List getJobFacetRow() { return jdbi.withHandle( h -> diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index 7a137a6bc1..45cdbbfa56 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -78,7 +78,6 @@ public void testUpsertJobVersion() { newRowUuid(), newTimestamp(), jobRow.getUuid(), - jobRow.getJobContextUuid().get(), newLocation().toString(), version.getValue(), jobRow.getName(), @@ -94,7 +93,6 @@ public void testUpsertJobVersion() { newRowUuid(), newTimestamp(), jobRow.getUuid(), - jobRow.getJobContextUuid().get(), newLocation().toString(), version.getValue(), jobRow.getName(), @@ -117,7 +115,6 @@ public void testUpdateLatestRunFor() { newRowUuid(), newTimestamp(), jobRow.getUuid(), - jobRow.getJobContextUuid().get(), newLocation().toString(), newVersion().getValue(), jobRow.getName(), @@ -136,8 +133,7 @@ public void testUpdateLatestRunFor() { namespaceRow.getUuid(), namespaceRow.getName(), jobVersionRow.getJobName(), - jobVersionRow.getLocation().orElse(null), - jobVersionRow.getJobContextUuid()); + jobVersionRow.getLocation().orElse(null)); // Ensure the latest run is not associated with the job version. final Optional noLatestRunUuid = jobVersionDao.findLatestRunFor(jobVersionRow.getUuid()); @@ -163,7 +159,6 @@ public void testGetJobVersion() { newRowUuid(), newTimestamp(), jobRow.getUuid(), - jobRow.getJobContextUuid().get(), newLocation().toString(), version.getValue(), jobRow.getName(), diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 9bbec6283b..e42b1667d5 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -208,7 +208,6 @@ public void testGetLineageForSymlinkedJob() throws SQLException { writeJob.getJob().getNamespaceName(), symlinkTargetJobName, writeJob.getJob().getDescription().orElse(null), - writeJob.getJob().getJobContextUuid().orElse(null), writeJob.getJob().getLocation(), null, inputs); @@ -221,7 +220,6 @@ public void testGetLineageForSymlinkedJob() throws SQLException { writeJob.getJob().getNamespaceName(), writeJob.getJob().getName(), writeJob.getJob().getDescription().orElse(null), - writeJob.getJob().getJobContextUuid().orElse(null), writeJob.getJob().getLocation(), targetJob.getUuid(), inputs); diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 83fa0ae7d8..3095aebf0f 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -49,6 +49,7 @@ class RunDaoTest { private static RunDao runDao; private static Jdbi jdbi; private static JobVersionDao jobVersionDao; + private static OpenLineageDao openLineageDao; static NamespaceRow namespaceRow; static JobRow jobRow; @@ -58,6 +59,7 @@ public static void setUpOnce(Jdbi jdbi) { RunDaoTest.jdbi = jdbi; runDao = jdbi.onDemand(RunDao.class); jobVersionDao = jdbi.onDemand(JobVersionDao.class); + openLineageDao = jdbi.onDemand(OpenLineageDao.class); namespaceRow = DbTestUtils.newNamespace(jdbi); jobRow = DbTestUtils.newJob(jdbi, namespaceRow.getName(), newJobName().getValue()); } @@ -218,7 +220,6 @@ public void updateRowWithNullNominalTimeDoesNotUpdateNominalTime() { namespaceRow.getUuid(), namespaceRow.getName(), jobRow.getName(), - null, null); assertThat(row.getUuid()).isEqualTo(updatedRow.getUuid()); @@ -251,7 +252,6 @@ public void updateRowWithExternalId() { namespaceRow.getUuid(), namespaceRow.getName(), jobRow.getName(), - null, null); runDao.upsert( @@ -267,7 +267,6 @@ public void updateRowWithExternalId() { namespaceRow.getUuid(), namespaceRow.getName(), jobRow.getName(), - null, null); Optional runRowOpt = runDao.findRunByUuidAsExtendedRow(row.getUuid()); diff --git a/api/src/test/java/marquez/db/RunFacetsDaoTest.java b/api/src/test/java/marquez/db/RunFacetsDaoTest.java index f40374e96e..9dfa40c85a 100644 --- a/api/src/test/java/marquez/db/RunFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/RunFacetsDaoTest.java @@ -161,6 +161,36 @@ public void testInsertRunFacetsForCustomFacet() { assertThat(row.facet().toString()).startsWith("{\"custom-run-facet\": \"some-run-facet\"}"); } + @Test + public void testGetFacetsByRunUuid() { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet( + new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"), + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), + new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"), + null); + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Collections.emptyList(), + Collections.emptyList(), + new LineageEvent.ParentRunFacet( + PRODUCER_URL, + SCHEMA_URL, + new LineageEvent.RunLink(UUID.randomUUID().toString()), + new LineageEvent.JobLink("namespace", "name")), + ImmutableMap.of().of("custom-run-facet", "some-run-facet")); + + assertThat(runFacetsDao.findRunFacetsByRunUuid(lineageRow.getRun().getUuid()).getFacets()) + .hasSize(3) + .extracting("custom-run-facet") + .isEqualTo("some-run-facet"); + } + private List getRunFacetRow(String name) { return jdbi.withHandle( h -> diff --git a/api/src/test/java/marquez/db/SearchDaoTest.java b/api/src/test/java/marquez/db/SearchDaoTest.java index 3f74faed99..94bac4690d 100644 --- a/api/src/test/java/marquez/db/SearchDaoTest.java +++ b/api/src/test/java/marquez/db/SearchDaoTest.java @@ -7,7 +7,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.net.URL; import java.sql.SQLException; @@ -84,7 +83,6 @@ public static void setUpOnce(final Jdbi jdbi) throws SQLException { ImmutableSet.copyOf(j.getInputs()), ImmutableSet.of(), new URL(j.getLocation()), - ImmutableMap.of(), j.getDescription().orElse(null), null)); PGobject inputs = new PGobject(); @@ -99,7 +97,6 @@ public static void setUpOnce(final Jdbi jdbi) throws SQLException { namespaceRow.getName(), j.getName(), j.getDescription().orElse(null), - j.getJobContextUuid().orElse(null), j.getLocation(), symlinkTargetJob.getUuid(), inputs); diff --git a/api/src/test/java/marquez/db/mappers/JobFacetsMapperTest.java b/api/src/test/java/marquez/db/mappers/JobFacetsMapperTest.java new file mode 100644 index 0000000000..4370b035c1 --- /dev/null +++ b/api/src/test/java/marquez/db/mappers/JobFacetsMapperTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.UUID; +import marquez.common.Utils; +import marquez.db.Columns; +import marquez.service.models.JobFacets; +import org.jdbi.v3.core.statement.StatementContext; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.postgresql.util.PGobject; + +class JobFacetsMapperTest { + + private static ResultSet resultSet; + + @BeforeAll + public static void setUp() throws SQLException { + resultSet = mock(ResultSet.class); + when(resultSet.getMetaData()).thenReturn(mock(ResultSetMetaData.class)); + when(resultSet.getObject("run_uuid")) + .thenReturn(UUID.fromString("a32f2800-7782-3ce3-b77e-eeeeaded3cf3")); + when(resultSet.getObject("run_uuid", UUID.class)) + .thenReturn(UUID.fromString("a32f2800-7782-3ce3-b77e-eeeeaded3cf3")); + PGobject facets = new PGobject(); + String sql = + """ + [{ + "documentation": { + "description": "This is test description" + }, + "sql": { + "query": "SELECT 1 AS test" + } + }] + """; + facets.setValue(sql); + when(resultSet.getObject("facets")).thenReturn(facets); + when(resultSet.getString("facets")).thenReturn(facets.toString()); + } + + @Test + public void shouldMapFullJobFacets() throws SQLException { + JobFacetsMapper underTest = new JobFacetsMapper(); + + try (MockedStatic mocked = + Mockito.mockStatic(Columns.class, Mockito.CALLS_REAL_METHODS)) { + when(Columns.exists(resultSet, "facets")).thenReturn(true); + + JobFacets actualJobFacets = underTest.map(resultSet, mock(StatementContext.class)); + JobFacets expectedJobFacets = + Utils.fromJson( + this.getClass().getResourceAsStream("/mappers/job_facets_mapper.json"), + new TypeReference() {}); + + assertEquals(expectedJobFacets, actualJobFacets); + } + } +} diff --git a/api/src/test/java/marquez/db/mappers/JobMapperTest.java b/api/src/test/java/marquez/db/mappers/JobMapperTest.java index 93ea0f20cd..6dc8bed3f7 100644 --- a/api/src/test/java/marquez/db/mappers/JobMapperTest.java +++ b/api/src/test/java/marquez/db/mappers/JobMapperTest.java @@ -60,8 +60,6 @@ public static void setUp() throws SQLException, MalformedURLException { .thenReturn(UUID.fromString("b1d626a2-6d3a-475e-9ecf-943176d4a8c6")); when(resultSet.getString("current_location")).thenReturn("https://github.com/"); when(resultSet.getObject("current_location")).thenReturn("https://github.com/"); - when(resultSet.getString(Columns.CONTEXT)).thenReturn("{ \"test\" : \"value\"}"); - when(resultSet.getObject(Columns.CONTEXT)).thenReturn("{ \"test\" : \"value\"}"); when(resultSet.getString(Columns.FACETS)).thenReturn(null); when(resultSet.getObject(Columns.FACETS)).thenReturn(null); PGobject inputs = new PGobject(); diff --git a/api/src/test/java/marquez/db/mappers/RunFacetsMapperTest.java b/api/src/test/java/marquez/db/mappers/RunFacetsMapperTest.java new file mode 100644 index 0000000000..e1438a65dd --- /dev/null +++ b/api/src/test/java/marquez/db/mappers/RunFacetsMapperTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.UUID; +import marquez.common.Utils; +import marquez.db.Columns; +import marquez.service.models.RunFacets; +import org.jdbi.v3.core.statement.StatementContext; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.postgresql.util.PGobject; + +class RunFacetsMapperTest { + + private static ResultSet resultSet; + + @BeforeAll + public static void setUp() throws SQLException { + resultSet = mock(ResultSet.class); + when(resultSet.getMetaData()).thenReturn(mock(ResultSetMetaData.class)); + when(resultSet.getObject("run_uuid")) + .thenReturn(UUID.fromString("a32f2800-7782-3ce3-b77e-eeeeaded3cf3")); + when(resultSet.getObject("run_uuid", UUID.class)) + .thenReturn(UUID.fromString("a32f2800-7782-3ce3-b77e-eeeeaded3cf3")); + PGobject facets = new PGobject(); + String sql = + """ + [{ + "nominalTime": { + "nominalEndTime": "2020-01-01T00:00:00Z", + "nominalStartTime": "2020-01-01T00:00:00Z" + } + }] + """; + facets.setValue(sql); + when(resultSet.getObject("facets")).thenReturn(facets); + when(resultSet.getString("facets")).thenReturn(facets.toString()); + } + + @Test + public void shouldMapFullRunFacets() throws SQLException { + RunFacetsMapper underTest = new RunFacetsMapper(); + + try (MockedStatic mocked = + Mockito.mockStatic(Columns.class, Mockito.CALLS_REAL_METHODS)) { + when(Columns.exists(resultSet, "facets")).thenReturn(true); + + RunFacets actualRunFacets = underTest.map(resultSet, mock(StatementContext.class)); + RunFacets expectedRunFacets = + Utils.fromJson( + this.getClass().getResourceAsStream("/mappers/run_facets_mapper.json"), + new TypeReference() {}); + + assertEquals(expectedRunFacets, actualRunFacets); + } + } +} diff --git a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java index 8ab281218f..0a705ff91d 100644 --- a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java +++ b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java @@ -166,7 +166,6 @@ public void setup(Jdbi jdbi) throws SQLException { "description", null, null, - null, null); Map runArgsMap = new HashMap<>(); RunArgsRow argsRow = @@ -191,7 +190,6 @@ public void setup(Jdbi jdbi) throws SQLException { Instant.now(), NAMESPACE, job.getName(), - null, null); } diff --git a/api/src/test/java/marquez/service/models/NodeIdTest.java b/api/src/test/java/marquez/service/models/NodeIdTest.java index c1fdf6fb50..bdfc2f8415 100644 --- a/api/src/test/java/marquez/service/models/NodeIdTest.java +++ b/api/src/test/java/marquez/service/models/NodeIdTest.java @@ -144,6 +144,37 @@ public void testDatasetField(String namespace, String dataset, String field) { assertEquals(field, nodeId.asDatasetFieldId().getFieldName().getValue()); } + @ParameterizedTest(name = "testDatasetField-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-dataset$colA$my-namespace$my-dataset$colB", + "gs://bucket$/path/to/data$colA$gs://bucket$/path/to/data$colB", + "gs://bucket$/path/to/data$col_A$gs://bucket$/path/to/data$col_B" + }, + delimiter = '$') + public void testSameTypeAs( + String namespaceFirst, + String datasetFirst, + String fieldFirst, + String namespaceSecond, + String datasetSecond, + String fieldSecond) { + NamespaceName namespaceFirstName = NamespaceName.of(namespaceFirst); + FieldName fielFirstdName = FieldName.of(fieldFirst); + DatasetName datasetFirstName = DatasetName.of(datasetFirst); + DatasetId dsFirstId = new DatasetId(namespaceFirstName, datasetFirstName); + DatasetFieldId dsFirstfId = new DatasetFieldId(dsFirstId, fielFirstdName); + NodeId nodeFirstId = NodeId.of(dsFirstfId); + + NamespaceName namespaceSecondName = NamespaceName.of(namespaceSecond); + FieldName fielSeconddName = FieldName.of(fieldSecond); + DatasetName datasetSecondName = DatasetName.of(datasetSecond); + DatasetId dsSecondId = new DatasetId(namespaceSecondName, datasetSecondName); + DatasetFieldId dsSecondfId = new DatasetFieldId(dsSecondId, fielSeconddName); + NodeId nodeSecondId = NodeId.of(dsSecondfId); + assertTrue(nodeFirstId.sameTypeAs(nodeSecondId)); + } + @ParameterizedTest(name = "testDatasetField-{index} {argumentsWithNames}") @CsvSource( value = { diff --git a/api/src/test/java/marquez/service/models/ServiceModelGenerator.java b/api/src/test/java/marquez/service/models/ServiceModelGenerator.java index 04ee2ee0e6..4962f22e02 100644 --- a/api/src/test/java/marquez/service/models/ServiceModelGenerator.java +++ b/api/src/test/java/marquez/service/models/ServiceModelGenerator.java @@ -7,7 +7,6 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.time.temporal.ChronoUnit.HOURS; -import static marquez.common.models.CommonModelGenerator.newContext; import static marquez.common.models.CommonModelGenerator.newDatasetId; import static marquez.common.models.CommonModelGenerator.newDatasetIdsWith; import static marquez.common.models.CommonModelGenerator.newDatasetName; @@ -120,7 +119,6 @@ public static JobMeta newJobMetaWith( newInputsWith(namespaceName, numOfInputs), newOutputsWith(namespaceName, numOfOutputs), newLocation(), - newContext(), newDescription(), null); } diff --git a/api/src/test/resources/facets/job_facets.json b/api/src/test/resources/facets/job_facets.json new file mode 100644 index 0000000000..bd1ecec827 --- /dev/null +++ b/api/src/test/resources/facets/job_facets.json @@ -0,0 +1,12 @@ +{ + "documentation": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DocumentationJobFacet.json", + "description": "Loads newly added restaurant menu items daily." + }, + "sql": { + "query": "INSERT INTO menu_items (id, name, price, category_id, description)\n SELECT id, name, price, category_id, description\n FROM tmp_menu_items;", + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json" + } +} diff --git a/api/src/test/resources/facets/run_facets.json b/api/src/test/resources/facets/run_facets.json new file mode 100644 index 0000000000..35bfdb1371 --- /dev/null +++ b/api/src/test/resources/facets/run_facets.json @@ -0,0 +1,8 @@ +{ + "nominalTime": { + "_producer": "https://github.com/MarquezProject/marquez/blob/main/docker/metadata.json", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-0/NominalTimeRunFacet.json", + "nominalEndTime": "2020-01-01T00:00:00Z", + "nominalStartTime": "2020-01-01T00:00:00Z" + } +} diff --git a/api/src/test/resources/mappers/job_facets_mapper.json b/api/src/test/resources/mappers/job_facets_mapper.json new file mode 100644 index 0000000000..e12dbeecf0 --- /dev/null +++ b/api/src/test/resources/mappers/job_facets_mapper.json @@ -0,0 +1,11 @@ +{ + "runId": "a32f2800-7782-3ce3-b77e-eeeeaded3cf3", + "facets": { + "documentation": { + "description": "This is test description" + }, + "sql": { + "query": "SELECT 1 AS test" + } + } +} diff --git a/api/src/test/resources/mappers/run_facets_mapper.json b/api/src/test/resources/mappers/run_facets_mapper.json new file mode 100644 index 0000000000..0e8e493f43 --- /dev/null +++ b/api/src/test/resources/mappers/run_facets_mapper.json @@ -0,0 +1,9 @@ +{ + "runId": "a32f2800-7782-3ce3-b77e-eeeeaded3cf3", + "facets": { + "nominalTime": { + "nominalEndTime": "2020-01-01T00:00:00Z", + "nominalStartTime": "2020-01-01T00:00:00Z" + } + } +} diff --git a/clients/java/src/main/java/marquez/client/models/Job.java b/clients/java/src/main/java/marquez/client/models/Job.java index 2ffdea9681..288115a4b8 100644 --- a/clients/java/src/main/java/marquez/client/models/Job.java +++ b/clients/java/src/main/java/marquez/client/models/Job.java @@ -46,12 +46,11 @@ public Job( final Set inputs, final Set outputs, @Nullable final URL location, - final Map context, final String description, @Nullable final Run latestRun, @Nullable final Map facets, @Nullable UUID currentVersion) { - super(type, inputs, outputs, location, context, description, null); + super(type, inputs, outputs, location, description, null); this.id = id; this.name = name; this.simpleName = simpleName; diff --git a/clients/java/src/main/java/marquez/client/models/JobMeta.java b/clients/java/src/main/java/marquez/client/models/JobMeta.java index f35bcf68d4..68551fdbe7 100644 --- a/clients/java/src/main/java/marquez/client/models/JobMeta.java +++ b/clients/java/src/main/java/marquez/client/models/JobMeta.java @@ -5,10 +5,8 @@ package marquez.client.models; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.net.URL; -import java.util.Map; import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; @@ -25,7 +23,6 @@ public class JobMeta { @Getter private final Set inputs; @Getter private final Set outputs; @Nullable private final URL location; - @Getter private final Map context; @Nullable String description; @Nullable String runId; @@ -34,14 +31,12 @@ public JobMeta( @NonNull final Set inputs, @NonNull final Set outputs, @Nullable final URL location, - @Nullable final Map context, @Nullable final String description, @Nullable String runId) { this.type = type; this.inputs = inputs; this.outputs = outputs; this.location = location; - this.context = (context == null) ? ImmutableMap.of() : ImmutableMap.copyOf(context); this.description = description; this.runId = runId; } @@ -72,7 +67,6 @@ public static final class Builder { private Set outputs; @Nullable private URL location; @Nullable private String description; - @Nullable Map context; @Nullable String runId; private Builder() { @@ -126,11 +120,6 @@ public Builder location(@NonNull URL location) { return this; } - public Builder context(@Nullable Map context) { - this.context = context; - return this; - } - public Builder description(@Nullable String description) { this.description = description; return this; @@ -142,7 +131,7 @@ public Builder runId(@Nullable String runId) { } public JobMeta build() { - return new JobMeta(type, inputs, outputs, location, context, description, runId); + return new JobMeta(type, inputs, outputs, location, description, runId); } } } diff --git a/clients/java/src/main/java/marquez/client/models/JobVersion.java b/clients/java/src/main/java/marquez/client/models/JobVersion.java index 4b058101ef..0121eb3025 100644 --- a/clients/java/src/main/java/marquez/client/models/JobVersion.java +++ b/clients/java/src/main/java/marquez/client/models/JobVersion.java @@ -6,7 +6,6 @@ package marquez.client.models; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.ImmutableMap; import java.net.URL; import java.time.Instant; import java.util.List; @@ -28,7 +27,6 @@ public final class JobVersion { @Getter private final UUID version; @Getter private final String namespace; @Nullable private final URL location; - @Getter private final ImmutableMap context; @Getter private final List inputs; @Getter private final List outputs; @Getter @Nullable private final Run latestRun; @@ -39,7 +37,6 @@ public JobVersion( @NonNull final Instant createdAt, @NonNull final UUID version, @Nullable final URL location, - @Nullable final ImmutableMap context, List inputs, List outputs, @Nullable Run latestRun) { @@ -49,7 +46,6 @@ public JobVersion( this.version = version; this.namespace = id.getNamespace(); this.location = location; - this.context = (context == null) ? ImmutableMap.of() : context; this.inputs = inputs; this.outputs = outputs; this.latestRun = latestRun; diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index 926ed342a7..ff95f3bdbd 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -9,7 +9,6 @@ import static marquez.client.MarquezClient.DEFAULT_BASE_URL; import static marquez.client.MarquezPathV1.BASE_PATH; import static marquez.client.models.ModelGenerator.newConnectionUrl; -import static marquez.client.models.ModelGenerator.newContext; import static marquez.client.models.ModelGenerator.newDatasetFacets; import static marquez.client.models.ModelGenerator.newDatasetIdWith; import static marquez.client.models.ModelGenerator.newDatasetPhysicalName; @@ -237,7 +236,6 @@ public class MarquezClientTest { private static final URL LOCATION = newLocation(); private static final JobType JOB_TYPE = newJobType(); private static final String JOB_DESCRIPTION = newDescription(); - private static final Map JOB_CONTEXT = newContext(); private static final Job JOB = new Job( JOB_ID, @@ -251,7 +249,6 @@ public class MarquezClientTest { INPUTS, OUTPUTS, LOCATION, - JOB_CONTEXT, JOB_DESCRIPTION, null, null, @@ -344,7 +341,6 @@ public class MarquezClientTest { INPUTS, OUTPUTS, LOCATION, - JOB_CONTEXT, JOB_DESCRIPTION, new Run( RUN_ID, @@ -787,7 +783,6 @@ public void testCreateJob() throws Exception { .outputs(OUTPUTS) .location(LOCATION) .description(JOB_DESCRIPTION) - .context(JOB_CONTEXT) .build(); final String metaAsJson = JsonGenerator.newJsonFor(meta); final String jobAsJson = JsonGenerator.newJsonFor(JOB); @@ -808,7 +803,6 @@ public void testCreateJobWithRunId() throws Exception { .outputs(OUTPUTS) .location(LOCATION) .description(JOB_DESCRIPTION) - .context(JOB_CONTEXT) .runId(RUN_ID) .build(); final String metaAsJson = JsonGenerator.newJsonFor(meta); diff --git a/clients/java/src/test/java/marquez/client/models/JobMetaTest.java b/clients/java/src/test/java/marquez/client/models/JobMetaTest.java index 1696d3822c..ba7b4521a5 100644 --- a/clients/java/src/test/java/marquez/client/models/JobMetaTest.java +++ b/clients/java/src/test/java/marquez/client/models/JobMetaTest.java @@ -6,7 +6,6 @@ package marquez.client.models; import static marquez.client.models.JobType.BATCH; -import static marquez.client.models.ModelGenerator.newContext; import static marquez.client.models.ModelGenerator.newDescription; import static marquez.client.models.ModelGenerator.newJobMeta; import static marquez.client.models.ModelGenerator.newLocation; @@ -46,7 +45,6 @@ public void testBuilder_inputsAndOutputsSameNamespace() { .inputs(namespaceName, "a", "b", "c") .outputs(namespaceName, "d", "e") .location(newLocation()) - .context(newContext()) .description(newDescription()) .build(); diff --git a/clients/java/src/test/java/marquez/client/models/JsonGenerator.java b/clients/java/src/test/java/marquez/client/models/JsonGenerator.java index 7b2803bf16..ef1b7e1c27 100644 --- a/clients/java/src/test/java/marquez/client/models/JsonGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/JsonGenerator.java @@ -245,14 +245,11 @@ public static String newJsonFor(final JobMeta meta) { final ArrayNode inputs = MAPPER.valueToTree(meta.getInputs()); final ArrayNode outputs = MAPPER.valueToTree(meta.getOutputs()); final ObjectNode obj = MAPPER.createObjectNode(); - final ObjectNode context = MAPPER.createObjectNode(); - meta.getContext().forEach(context::put); obj.put("type", meta.getType().toString()); obj.putArray("inputs").addAll(inputs); obj.putArray("outputs").addAll(outputs); obj.put("location", meta.getLocation().map(URL::toString).orElse(null)); - obj.set("context", context); obj.put("description", meta.getDescription().orElse(null)); obj.put("runId", meta.getRunId().orElse(null)); @@ -267,8 +264,6 @@ public static String newJsonFor(final Job job) { .put("name", job.getId().getName()); final ArrayNode inputs = MAPPER.valueToTree(job.getInputs()); final ArrayNode outputs = MAPPER.valueToTree(job.getOutputs()); - final ObjectNode context = MAPPER.createObjectNode(); - job.getContext().forEach(context::put); final ObjectNode obj = MAPPER.createObjectNode(); obj.set("id", id); @@ -281,7 +276,6 @@ public static String newJsonFor(final Job job) { obj.putArray("inputs").addAll(inputs); obj.putArray("outputs").addAll(outputs); obj.put("location", job.getLocation().map(URL::toString).orElse(null)); - obj.set("context", context); obj.put("description", job.getDescription().orElse(null)); obj.set("latestRun", toObj(job.getLatestRun().orElse(null))); obj.put("currentVersion", job.getCurrentVersion().map(UUID::toString).orElse(null)); diff --git a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java index 0226e744d3..5a18683d3d 100644 --- a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java @@ -179,7 +179,6 @@ public static JobMeta newJobMeta() { .outputs(newOutputs(4)) .location(newLocation()) .description(newDescription()) - .context(newContext()) .build(); } @@ -206,7 +205,6 @@ public static Job newJobWith(final Run latestRun, UUID currentVersion) { newInputs(2), newOutputs(4), newLocation(), - newContext(), newDescription(), latestRun, null, @@ -302,11 +300,6 @@ public static URL newLocation() { String.format("https://github.com/test-org/test-repo/blob/%s/test.java", newId())); } - public static Map newContext() { - return ImmutableMap.of( - "sql", String.format("SELECT * FROM room_bookings WHERE room = '%dH';", newId())); - } - public static URL newSchemaLocation() { return Utils.toUrl("http://localhost:8081/schemas/ids/" + newId()); } diff --git a/lombok.config b/lombok.config index 7a21e88040..fdccd2329c 100644 --- a/lombok.config +++ b/lombok.config @@ -1 +1,2 @@ lombok.addLombokGeneratedAnnotation = true +lombok.nonNull.exceptionType=JDK \ No newline at end of file diff --git a/spec/openapi.yml b/spec/openapi.yml index 4e1d91fa07..663e1d5559 100644 --- a/spec/openapi.yml +++ b/spec/openapi.yml @@ -477,6 +477,24 @@ paths: schema: $ref: '#/components/schemas/IncompleteRun' + /jobs/runs/{id}/facets: + parameters: + - $ref: '#/components/parameters/runId' + - $ref: '#/components/parameters/type' + get: + operationId: getFacets + summary: Retrieve run or job facets for a run. + description: Retrieve run or job facets for a run. + tags: + - Jobs + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FacetsResult' + /jobs/runs/{id}/start: parameters: - $ref: '#/components/parameters/runId' @@ -857,6 +875,17 @@ components: type: string format: date-time example: 2022-09-15T07:47:19Z + + type: + name: type + in: query + description: Indicates if should return job or run facets. + required: true + schema: + type: string + enum: + - run + - job schemas: CreatedNamespace: @@ -1918,3 +1947,12 @@ components: enum: [DATASET, JOB] description: The type of search result. example: DATASET + + FacetsResult: + type: object + properties: + runId: + description: The ID associated with the run modifying the table. + type: string + facets: + $ref: '#/components/schemas/CustomFacet' \ No newline at end of file diff --git a/web/src/components/datasets/DatasetInfo.tsx b/web/src/components/datasets/DatasetInfo.tsx index 55bb3c3fbf..8b9392af0c 100644 --- a/web/src/components/datasets/DatasetInfo.tsx +++ b/web/src/components/datasets/DatasetInfo.tsx @@ -8,19 +8,54 @@ import MqCode from '../core/code/MqCode' import MqEmpty from '../core/empty/MqEmpty' import MqJsonView from '../core/json-view/MqJsonView' import MqText from '../core/text/MqText' -import React, { FunctionComponent } from 'react' +import React, { FunctionComponent, useEffect } from 'react' import RunStatus from '../jobs/RunStatus' +import * as Redux from 'redux' +import { IState } from '../../store/reducers' +import { connect } from 'react-redux' +import { fetchJobFacets, resetFacets } from '../../store/actionCreators' -interface DatasetInfoProps { +export interface DispatchProps { + fetchJobFacets: typeof fetchJobFacets + resetFacets: typeof resetFacets +} + +interface JobFacets { + [key: string]: object +} + +export interface JobFacetsProps { + jobFacets: JobFacets +} + +export interface SqlFacet { + query: string +} + + +type DatasetInfoProps = { datasetFields: Field[] facets?: object run?: Run -} +} & JobFacetsProps & + DispatchProps const DatasetInfo: FunctionComponent = props => { - const { datasetFields, facets, run } = props + const { datasetFields, facets, run, jobFacets, fetchJobFacets, resetFacets } = props const i18next = require('i18next') + useEffect(() => { + run && fetchJobFacets(run.id) + }, []) + + // unmounting + useEffect( + () => () => { + resetFacets() + }, + [] + ) + return ( {datasetFields.length === 0 && ( @@ -86,11 +121,27 @@ const DatasetInfo: FunctionComponent = props => { {run.jobVersion.name} - + {jobFacets.sql && } )} ) } -export default DatasetInfo +const mapStateToProps = (state: IState) => ({ + jobFacets: state.facets.result +}) + +const mapDispatchToProps = (dispatch: Redux.Dispatch) => + Redux.bindActionCreators( + { + fetchJobFacets: fetchJobFacets, + resetFacets: resetFacets + }, + dispatch + ) + +export default connect( + mapStateToProps, + mapDispatchToProps +)(DatasetInfo) diff --git a/web/src/components/jobs/RunInfo.tsx b/web/src/components/jobs/RunInfo.tsx index e106fab4e6..f684d63aa6 100644 --- a/web/src/components/jobs/RunInfo.tsx +++ b/web/src/components/jobs/RunInfo.tsx @@ -1,25 +1,59 @@ // Copyright 2018-2023 contributors to the Marquez project // SPDX-License-Identifier: Apache-2.0 - import { Box } from '@material-ui/core' import { Run } from '../../types/api' import { formatUpdatedAt } from '../../helpers' import MqCode from '../core/code/MqCode' import MqJsonView from '../core/json-view/MqJsonView' import MqText from '../core/text/MqText' -import React, { FunctionComponent } from 'react' +import React, { FunctionComponent, useEffect } from 'react' +import * as Redux from 'redux' +import { IState } from '../../store/reducers' +import { connect } from 'react-redux' +import { fetchJobFacets, resetFacets } from '../../store/actionCreators' -interface RunInfoProps { - run: Run +export interface DispatchProps { + fetchJobFacets: typeof fetchJobFacets + resetFacets: typeof resetFacets } +interface JobFacets { + [key: string]: object +} + +export interface JobFacetsProps { + jobFacets: JobFacets +} + +export interface SqlFacet { + query: string +} + + +type RunInfoProps = { + run: Run +} & JobFacetsProps & + DispatchProps + const RunInfo: FunctionComponent = props => { - const { run } = props + const { run, jobFacets, fetchJobFacets, resetFacets } = props const i18next = require('i18next') + useEffect(() => { + fetchJobFacets(run.id) + }, []) + + // unmounting + useEffect( + () => () => { + resetFacets() + }, + [] + ) + return ( - + {jobFacets.sql && } {formatUpdatedAt(run.updatedAt)} @@ -37,4 +71,21 @@ const RunInfo: FunctionComponent = props => { ) } -export default RunInfo + +const mapStateToProps = (state: IState) => ({ + jobFacets: state.facets.result +}) + +const mapDispatchToProps = (dispatch: Redux.Dispatch) => + Redux.bindActionCreators( + { + fetchJobFacets: fetchJobFacets, + resetFacets: resetFacets + }, + dispatch + ) + +export default connect( + mapStateToProps, + mapDispatchToProps +)(RunInfo) diff --git a/web/src/components/lineage/types.ts b/web/src/components/lineage/types.ts index 8cc3d573c2..0f32f786b9 100644 --- a/web/src/components/lineage/types.ts +++ b/web/src/components/lineage/types.ts @@ -36,9 +36,6 @@ export interface LineageJob { inputs: { namespace: string; name: string }[] outputs: { namespace: string; name: string }[] location: string - context: { - [key: string]: string - } description: string latestRun: Nullable } diff --git a/web/src/helpers/nodes.ts b/web/src/helpers/nodes.ts index 1b4602f4d6..aa3c758d7a 100644 --- a/web/src/helpers/nodes.ts +++ b/web/src/helpers/nodes.ts @@ -1,7 +1,7 @@ // Copyright 2018-2023 contributors to the Marquez project // SPDX-License-Identifier: Apache-2.0 -import { EventType, Facets, Run, RunState } from '../types/api' +import { EventType, DataQualityFacets, Run, RunState } from '../types/api' import { JobOrDataset, LineageDataset, LineageJob, MqNode } from '../components/lineage/types' import { Undefinable } from '../types/util/Nullable' import { theme } from './theme' @@ -108,7 +108,7 @@ export function jobRunsStatus(runs: Run[], limit = 14) { } } -export function datasetFacetsStatus(facets: Facets, limit = 14) { +export function datasetFacetsStatus(facets: DataQualityFacets, limit = 14) { const assertions = facets?.dataQualityAssertions?.assertions?.slice(-limit) if (!assertions?.length) { diff --git a/web/src/store/actionCreators/actionTypes.ts b/web/src/store/actionCreators/actionTypes.ts index beffb44095..2cc9b5397a 100644 --- a/web/src/store/actionCreators/actionTypes.ts +++ b/web/src/store/actionCreators/actionTypes.ts @@ -49,3 +49,9 @@ export const RESET_LINEAGE = 'RESET_LINEAGE' // search export const FETCH_SEARCH = 'FETCH_SEARCH' export const FETCH_SEARCH_SUCCESS = 'FETCH_SEARCH _SUCCESS' + +// facets +export const FETCH_RUN_FACETS = 'FETCH_RUN_FACETS' +export const FETCH_JOB_FACETS = 'FETCH_JOB_FACETS' +export const FETCH_FACETS_SUCCESS = 'FETCH_FACETS_SUCCESS' +export const RESET_FACETS = 'RESET_FACETS' diff --git a/web/src/store/actionCreators/index.ts b/web/src/store/actionCreators/index.ts index 4dd16757c0..4e8c5043c9 100644 --- a/web/src/store/actionCreators/index.ts +++ b/web/src/store/actionCreators/index.ts @@ -7,6 +7,7 @@ import { Dataset, DatasetVersion, Event, + Facets, Job, LineageGraph, Namespace, @@ -155,6 +156,31 @@ export const fetchRunsSuccess = (jobName: string, jobRuns: Run[]) => ({ } }) +export const fetchRunFacets = (runId: string) => ({ + type: actionTypes.FETCH_RUN_FACETS, + payload: { + runId + } +}) + +export const fetchJobFacets = (runId: string) => ({ + type: actionTypes.FETCH_JOB_FACETS, + payload: { + runId + } +}) + +export const fetchFacetsSuccess = (facets: Facets) => ({ + type: actionTypes.FETCH_FACETS_SUCCESS, + payload: { + facets: facets.facets + } +}) + +export const resetFacets = () => ({ + type: actionTypes.RESET_FACETS +}) + export const resetRuns = () => ({ type: actionTypes.RESET_RUNS }) diff --git a/web/src/store/reducers/facets.ts b/web/src/store/reducers/facets.ts new file mode 100644 index 0000000000..688d214292 --- /dev/null +++ b/web/src/store/reducers/facets.ts @@ -0,0 +1,33 @@ +// Copyright 2018-2023 contributors to the Marquez project +// SPDX-License-Identifier: Apache-2.0 + +import { + FETCH_FACETS_SUCCESS, + FETCH_JOB_FACETS, + FETCH_RUN_FACETS, + RESET_FACETS +} from '../actionCreators/actionTypes' +import { Facets } from '../../types/api' +import { fetchFacetsSuccess } from '../actionCreators' + +export type IFacetsState = { isLoading: boolean; result: object; init: boolean } + +export const initialState: IFacetsState = { isLoading: false, result: {} as Facets, init: false } + +type IFacetsAction = ReturnType + +export default (state = initialState, action: IFacetsAction): IFacetsState => { + const { type, payload } = action + + switch (type) { + case FETCH_JOB_FACETS: + case FETCH_RUN_FACETS: + return { ...state, isLoading: true } + case FETCH_FACETS_SUCCESS: + return { ...state, isLoading: false, init: true, result: payload.facets } + case RESET_FACETS: + return initialState + default: + return state + } +} diff --git a/web/src/store/reducers/index.ts b/web/src/store/reducers/index.ts index 119a3d2a9d..46cffe8653 100644 --- a/web/src/store/reducers/index.ts +++ b/web/src/store/reducers/index.ts @@ -9,6 +9,7 @@ import datasetVersions, { IDatasetVersionsState } from './datasetVersions' import datasets, { IDatasetsState } from './datasets' import display, { IDisplayState } from './display' import events, { IEventsState } from './events' +import facets, { IFacetsState } from './facets' import jobs, { IJobsState } from './jobs' import lineage, { ILineageState } from './lineage' import namespaces, { INamespacesState } from './namespaces' @@ -27,6 +28,7 @@ export interface IState { router: any lineage: ILineageState search: ISearchState + facets: IFacetsState } export default (history: History): Reducer => @@ -41,5 +43,6 @@ export default (history: History): Reducer => namespaces, display, lineage, - search + search, + facets }) diff --git a/web/src/store/requests/facets.ts b/web/src/store/requests/facets.ts new file mode 100644 index 0000000000..23d9a2d1bf --- /dev/null +++ b/web/src/store/requests/facets.ts @@ -0,0 +1,16 @@ +// Copyright 2018-2023 contributors to the Marquez project +// SPDX-License-Identifier: Apache-2.0 + +import { API_URL } from '../../globals' +import { Facets } from '../../types/api' +import { genericFetchWrapper } from './index' + +export const getRunFacets = async (runId: string) => { + const url = `${API_URL}/jobs/runs/${runId}/facets?type=run` + return genericFetchWrapper(url, { method: 'GET' }, 'fetchRunFacets').then((r: Facets) => r) +} + +export const getJobFacets = async (runId: string) => { + const url = `${API_URL}/jobs/runs/${runId}/facets?type=job` + return genericFetchWrapper(url, { method: 'GET' }, 'fetchJobFacets').then((r: Facets) => r) +} \ No newline at end of file diff --git a/web/src/store/requests/index.ts b/web/src/store/requests/index.ts index 415724d04a..5ee96c1674 100644 --- a/web/src/store/requests/index.ts +++ b/web/src/store/requests/index.ts @@ -42,5 +42,6 @@ export const genericFetchWrapper = async (url: string, params: IParams, function export * from './datasets' export * from './events' +export * from './facets' export * from './namespaces' export * from './jobs' diff --git a/web/src/store/sagas/index.ts b/web/src/store/sagas/index.ts index b82df560b8..8f7bffe6d8 100644 --- a/web/src/store/sagas/index.ts +++ b/web/src/store/sagas/index.ts @@ -10,8 +10,10 @@ import { FETCH_DATASET_VERSIONS, FETCH_EVENTS, FETCH_JOBS, + FETCH_JOB_FACETS, FETCH_LINEAGE, FETCH_RUNS, + FETCH_RUN_FACETS, FETCH_SEARCH } from '../actionCreators/actionTypes' import { Namespaces } from '../../types/api' @@ -27,6 +29,7 @@ import { fetchDatasetVersionsSuccess, fetchDatasetsSuccess, fetchEventsSuccess, + fetchFacetsSuccess, fetchJobsSuccess, fetchLineageSuccess, fetchNamespacesSuccess, @@ -40,8 +43,10 @@ import { getDatasetVersions, getDatasets, getEvents, + getJobFacets, getJobs, getNamespaces, + getRunFacets, getRuns } from '../requests' import { getLineage } from '../requests/lineage' @@ -177,6 +182,30 @@ export function* fetchDatasetVersionsSaga() { } } +export function* fetchJobFacetsSaga() { + while (true) { + try { + const { payload } = yield take(FETCH_JOB_FACETS) + const jobFacets = yield call(getJobFacets, payload.runId) + yield put(fetchFacetsSuccess(jobFacets)) + } catch (e) { + yield put(applicationError('Something went wrong while fetching job facets')) + } + } +} + +export function* fetchRunFacetsSaga() { + while (true) { + try { + const { payload } = yield take(FETCH_RUN_FACETS) + const runFacets = yield call(getRunFacets, payload.runId) + yield put(fetchFacetsSuccess(runFacets)) + } catch (e) { + yield put(applicationError('Something went wrong while fetching run facets')) + } + } +} + export default function* rootSaga(): Generator { const sagasThatAreKickedOffImmediately = [fetchNamespaces()] const sagasThatWatchForAction = [ @@ -186,6 +215,8 @@ export default function* rootSaga(): Generator { fetchDatasetSaga(), fetchDatasetVersionsSaga(), fetchEventsSaga(), + fetchJobFacetsSaga(), + fetchRunFacetsSaga(), fetchLineage(), fetchSearch(), deleteJobSaga(), diff --git a/web/src/types/api.ts b/web/src/types/api.ts index 04d939af4e..9bdc92fcd0 100644 --- a/web/src/types/api.ts +++ b/web/src/types/api.ts @@ -82,7 +82,7 @@ export interface DatasetVersions { versions: DatasetVersion[] } -export interface Facets { +export interface DataQualityFacets { dataQualityAssertions?: { assertions?: { assertion: string @@ -144,9 +144,6 @@ export interface Job { outputs: DatasetId[] namespace: string location: string - context: { - [key: string]: string - } description: string latestRun: Run } @@ -164,9 +161,6 @@ export interface Runs { export interface Run { id: string - context: { - sql?: string - } createdAt: string updatedAt: string nominalStartTime: string @@ -211,3 +205,10 @@ export interface GroupedSearchResult { results: Map rawResults: GroupedSearch[] } + +export interface Facets { + runId: string + facets: { + [key: string]: object + } +}