From 87b10317ce6e9fbca703cfede6bbf2cf11c09f75 Mon Sep 17 00:00:00 2001 From: Willy Lulciuc Date: Wed, 10 Feb 2021 10:29:19 -0800 Subject: [PATCH 1/2] Add installation steps for marquez-spark [skip ci] (#938) Signed-off-by: wslulciuc --- clients/java/README.md | 4 ++-- integrations/spark/README.md | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/clients/java/README.md b/clients/java/README.md index 43e0c48b56..852ec4621e 100644 --- a/clients/java/README.md +++ b/clients/java/README.md @@ -10,14 +10,14 @@ Maven: io.github.marquezproject marquez-java - 0.4.3 + 0.12.0 ``` or Gradle: ```groovy -implementation 'io.github.marquezproject:marquez-java:0.4.3' +implementation 'io.github.marquezproject:marquez-java:0.12.0' ``` ## Usage diff --git a/integrations/spark/README.md b/integrations/spark/README.md index be55beac1d..64960f2cf9 100644 --- a/integrations/spark/README.md +++ b/integrations/spark/README.md @@ -1,10 +1,29 @@ # Marquez Spark Agent + The Marquez Spark Agent uses jvm instrumentation to emit OpenLineage metadata to Marquez. +## Installation + +Maven: + +```xml + + io.github.marquezproject + marquez-spark + 0.12.0 + +``` + +or Gradle: + +```groovy +implementation 'io.github.marquezproject:marquez-spark:0.12.0' +``` ## Getting started ### Dataproc + Dataproc requires two things: a uri to the marquez java agent jar in the `files` parameter and an additional spark property. Dataproc will copy the agent jar to the current working directory of the executor and the `-javaagent` parameter will load it on execution. @@ -36,6 +55,7 @@ t1 = DataProcPySparkOperator( ``` ## Arguments + The java agent accepts an argument in the form of a uri. It includes the location of Marquez, the namespace name, the job name, and a unique run id. The run id will be emitted as a parent run facet. From 77d14045dacd8a5dcf16085c2b7e4f4c35062c0b Mon Sep 17 00:00:00 2001 From: henneberger Date: Wed, 10 Feb 2021 10:55:42 -0800 Subject: [PATCH 2/2] Denormalize namespace name to job version table (#935) * Denormalize namespace name to job table Signed-off-by: henneberger * Add spotless Signed-off-by: henneberger * Denormalize namespace & job name to job_versions Signed-off-by: henneberger Co-authored-by: Willy Lulciuc --- api/src/main/java/marquez/db/JobDao.java | 17 ++- .../main/java/marquez/db/JobVersionDao.java | 103 ++++++++++-------- .../main/java/marquez/db/OpenLineageDao.java | 6 +- api/src/main/java/marquez/db/RunDao.java | 4 +- .../mappers/ExtendedJobVersionRowMapper.java | 7 +- .../db/models/ExtendedJobVersionRow.java | 8 +- .../java/marquez/db/models/JobVersionRow.java | 3 + .../main/java/marquez/service/JobService.java | 30 ++++- .../java/marquez/service/mappers/Mapper.java | 10 +- .../V20__alter_jobs_to_add_namespace_name.sql | 2 + ...1__alter_job_versions_to_add_namespace.sql | 13 +++ .../java/marquez/service/JobServiceTest.java | 6 +- 12 files changed, 142 insertions(+), 67 deletions(-) create mode 100644 api/src/main/resources/marquez/db/migration/V20__alter_jobs_to_add_namespace_name.sql create mode 100644 api/src/main/resources/marquez/db/migration/V21__alter_job_versions_to_add_namespace.sql diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index 1c83dad732..15b4f4ae6f 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -35,6 +35,7 @@ public interface JobDao { + "created_at, " + "updated_at, " + "namespace_uuid, " + + "namespace_name, " + "name, " + "description, " + "current_version_uuid" @@ -44,6 +45,7 @@ public interface JobDao { + ":createdAt, " + ":updatedAt, " + ":namespaceUuid, " + + ":namespaceName, " + ":name, " + ":description, " + ":currentVersionUuid)") @@ -80,9 +82,8 @@ public interface JobDao { Optional find(String namespaceName, String jobName); @SqlQuery( - "SELECT j.*, n.name AS namespace_name FROM jobs AS j " - + "INNER JOIN namespaces AS n " - + " ON (n.name = :namespaceName AND j.namespace_uuid = n.uuid) " + "SELECT j.* FROM jobs AS j " + + "WHERE namespace_name = :namespaceName " + "ORDER BY j.name " + "LIMIT :limit OFFSET :offset") List findAll(String namespaceName, int limit, int offset); @@ -97,6 +98,7 @@ public interface JobDao { + "created_at, " + "updated_at, " + "namespace_uuid, " + + "namespace_name, " + "name, " + "description " + ") VALUES ( " @@ -105,6 +107,7 @@ public interface JobDao { + ":now, " + ":now, " + ":namespaceUuid, " + + ":namespaceName, " + ":name, " + ":description " + ") ON CONFLICT (name, namespace_uuid) DO " @@ -114,5 +117,11 @@ public interface JobDao { + "description = EXCLUDED.description " + "RETURNING *") JobRow upsert( - UUID uuid, JobType type, Instant now, UUID namespaceUuid, String name, String description); + UUID uuid, + JobType type, + Instant now, + UUID namespaceUuid, + String namespaceName, + String name, + String description); } diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index cc257238da..912ac69063 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -25,6 +25,7 @@ import org.jdbi.v3.sqlobject.CreateSqlObject; import org.jdbi.v3.sqlobject.SqlObject; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.BindBean; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; import org.jdbi.v3.sqlobject.transaction.Transaction; @@ -41,50 +42,51 @@ enum IoType { @Transaction default void insert(@NonNull JobVersionRow row) { - withHandle( - handle -> - handle - .createUpdate( - "INSERT INTO job_versions (" - + "uuid, " - + "created_at, " - + "updated_at, " - + "job_uuid, " - + "version, " - + "location, " - + "latest_run_uuid, " - + "job_context_uuid" - + ") VALUES (" - + ":uuid, " - + ":createdAt, " - + ":updateAt, " - + ":jobUuid, " - + ":version, " - + ":location, " - + ":latestRunUuid, " - + ":jobContextUuid)") - .bindBean(row) - .execute()); + insert_job_only(row); + // I/O - row.getInputUuids().forEach(inputUuid -> updateInputs(row.getUuid(), inputUuid)); - row.getOutputUuids().forEach(outputUuid -> updateOutputs(row.getUuid(), outputUuid)); + for (UUID inputUuid : row.getInputUuids()) { + updateInputsOrOutputs(row.getUuid(), inputUuid, IoType.INPUT.name()); + } + for (UUID outputUuid : row.getOutputUuids()) { + updateInputsOrOutputs(row.getUuid(), outputUuid, IoType.OUTPUT.name()); + } // Version final Instant updatedAt = row.getCreatedAt(); createJobDao().updateVersion(row.getJobUuid(), updatedAt, row.getUuid()); } + @SqlUpdate( + "INSERT INTO job_versions (" + + "uuid, " + + "created_at, " + + "updated_at, " + + "job_uuid, " + + "version, " + + "location, " + + "latest_run_uuid, " + + "job_name, " + + "namespace_uuid, " + + "namespace_name, " + + "job_context_uuid" + + ") VALUES (" + + ":uuid, " + + ":createdAt, " + + ":updateAt, " + + ":jobUuid, " + + ":version, " + + ":location, " + + ":latestRunUuid, " + + ":jobName, " + + ":namespaceUuid, " + + ":namespaceName, " + + ":jobContextUuid)") + void insert_job_only(@BindBean JobVersionRow row); + @SqlQuery("SELECT EXISTS (SELECT 1 FROM job_versions WHERE version = :version)") boolean exists(UUID version); - default void updateInputs(UUID versionUuid, UUID inputUuid) { - updateInputsOrOutputs(versionUuid, inputUuid, IoType.INPUT.name()); - } - - default void updateOutputs(UUID versionUuid, UUID outputUuid) { - updateInputsOrOutputs(versionUuid, outputUuid, IoType.OUTPUT.name()); - } - @SqlUpdate( "INSERT INTO job_versions_io_mapping (job_version_uuid, dataset_uuid, io_type) " + "VALUES (:versionUuid, :datasetUuid, :ioType)") @@ -98,7 +100,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { void updateLatestRun(UUID rowUuid, Instant updatedAt, UUID latestRunUuid); final String EXTENDED_SELECT = - "SELECT j.namespace_uuid, jv.*, jc.uuid AS job_context_uuid, jc.context, n.name as namespace_name, j.name, " + "SELECT jv.namespace_uuid, jv.*, jc.uuid AS job_context_uuid, jc.context, jv.namespace_name, jv.job_name as name, " + "ARRAY(SELECT dataset_uuid " + " FROM job_versions_io_mapping " + " WHERE job_version_uuid = jv.uuid AND " @@ -108,10 +110,6 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { + " WHERE job_version_uuid = jv.uuid AND " + " io_type = 'OUTPUT') AS output_uuids " + "FROM job_versions AS jv " - + "INNER JOIN jobs AS j " - + " ON j.uuid = jv.job_uuid " - + "INNER JOIN namespaces AS n " - + " ON j.namespace_uuid = n.uuid " + "INNER JOIN job_contexts AS jc " + " ON job_context_uuid = jc.uuid "; @@ -120,7 +118,9 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { @SqlQuery( EXTENDED_SELECT - + "WHERE n.name = :namespaceName AND j.name = :jobName AND j.current_version_uuid = jv.uuid " + + "INNER JOIN jobs AS j " + + " ON j.uuid = jv.job_uuid " + + "WHERE jv.namespace_name = :namespaceName AND jv.job_name = :jobName AND j.current_version_uuid = jv.uuid " + "ORDER BY created_at DESC " + "LIMIT 1") Optional findLatest(String namespaceName, String jobName); @@ -130,7 +130,7 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { @SqlQuery( EXTENDED_SELECT - + "WHERE n.name = :namespaceName AND j.name = :jobName " + + "WHERE jv.namespace_name = :namespaceName AND jv.job_name = :jobName " + "ORDER BY created_at DESC " + "LIMIT :limit OFFSET :offset") List findAll(String namespaceName, String jobName, int limit, int offset); @@ -146,7 +146,10 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { + "job_uuid, " + "job_context_uuid, " + "location," - + "version" + + "version," + + "job_name," + + "namespace_uuid," + + "namespace_name" + ") VALUES (" + ":uuid, " + ":now, " @@ -154,15 +157,25 @@ default void updateOutputs(UUID versionUuid, UUID outputUuid) { + ":jobUuid, " + ":jobContextUuid, " + ":location, " - + ":version) " + + ":version, " + + ":jobName, " + + ":namespaceUuid, " + + ":namespaceName) " + "ON CONFLICT(version) DO " + "UPDATE SET " + "updated_at = EXCLUDED.updated_at, " - + "job_uuid = EXCLUDED.job_uuid, " + "job_context_uuid = EXCLUDED.job_context_uuid " + "RETURNING *") ExtendedJobVersionRow upsert( - UUID uuid, Instant now, UUID jobUuid, UUID jobContextUuid, String location, UUID version); + UUID uuid, + Instant now, + UUID jobUuid, + UUID jobContextUuid, + String location, + UUID version, + String jobName, + UUID namespaceUuid, + String namespaceName); @SqlUpdate( "INSERT INTO job_versions_io_mapping (" diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 78f766e046..b6d91699f5 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -106,6 +106,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event) { getJobType(event.getJob()), now, namespace.getUuid(), + namespace.getName(), event.getJob().getName(), description); bag.setJob(job); @@ -128,7 +129,10 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event) { job.getUuid(), jobContext.getUuid(), location, - buildJobVersion(event, context)); + buildJobVersion(event, context), + job.getName(), + job.getNamespaceUuid(), + job.getNamespaceName()); bag.setJobVersion(jobVersion); jobDao.updateVersion(job.getUuid(), Instant.now(), jobVersion.getUuid()); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index a36127ce52..0d920d8abf 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -150,9 +150,7 @@ default void updateJobVersionUuid(UUID rowUuid, Instant updatedAt, UUID jobVersi @SqlQuery( SELECT_RUN + "INNER JOIN job_versions AS jv ON r.job_version_uuid = jv.uuid " - + "INNER JOIN jobs AS j ON jv.job_uuid = j.uuid " - + "INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid " - + "WHERE n.name = :namespace and j.name = :jobName " + + "WHERE jv.namespace_name = :namespace and jv.job_name = :jobName " + "ORDER BY r.created_at DESC " + "LIMIT :limit OFFSET :offset") List findAll(String namespace, String jobName, int limit, int offset); diff --git a/api/src/main/java/marquez/db/mappers/ExtendedJobVersionRowMapper.java b/api/src/main/java/marquez/db/mappers/ExtendedJobVersionRowMapper.java index 9de519dc6b..830907a8a6 100644 --- a/api/src/main/java/marquez/db/mappers/ExtendedJobVersionRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/ExtendedJobVersionRowMapper.java @@ -55,9 +55,8 @@ public ExtendedJobVersionRow map(@NonNull ResultSet results, @NonNull StatementC uuidOrThrow(results, Columns.VERSION), uuidOrNull(results, Columns.LATEST_RUN_UUID), columnNames.contains(Columns.CONTEXT) ? stringOrThrow(results, Columns.CONTEXT) : "", - columnNames.contains(Columns.NAMESPACE_NAME) - ? stringOrThrow(results, Columns.NAMESPACE_NAME) - : "", - columnNames.contains(Columns.NAME) ? stringOrThrow(results, Columns.NAME) : ""); + stringOrThrow(results, Columns.NAMESPACE_NAME), + columnNames.contains(Columns.NAME) ? stringOrThrow(results, Columns.NAME) : "", + uuidOrThrow(results, Columns.NAMESPACE_UUID)); } } diff --git a/api/src/main/java/marquez/db/models/ExtendedJobVersionRow.java b/api/src/main/java/marquez/db/models/ExtendedJobVersionRow.java index db3a9c84ec..3499d2cb92 100644 --- a/api/src/main/java/marquez/db/models/ExtendedJobVersionRow.java +++ b/api/src/main/java/marquez/db/models/ExtendedJobVersionRow.java @@ -42,18 +42,22 @@ public ExtendedJobVersionRow( final UUID latestRunUuid, @NonNull final String context, @NonNull final String namespaceName, - @NonNull final String name) { + @NonNull final String name, + @NonNull final UUID namespaceUuid) { super( uuid, createdAt, updatedAt, jobUuid, + name, jobContextUuid, inputUuids, outputUuids, location, version, - latestRunUuid); + latestRunUuid, + namespaceUuid, + namespaceName); this.context = context; this.namespaceName = namespaceName; this.name = name; diff --git a/api/src/main/java/marquez/db/models/JobVersionRow.java b/api/src/main/java/marquez/db/models/JobVersionRow.java index 5ef1312bf9..d2de30d81b 100644 --- a/api/src/main/java/marquez/db/models/JobVersionRow.java +++ b/api/src/main/java/marquez/db/models/JobVersionRow.java @@ -33,12 +33,15 @@ public class JobVersionRow { @Getter @NonNull private final Instant createdAt; @Getter @NonNull private final Instant updateAt; @Getter @NonNull private final UUID jobUuid; + @Getter @NonNull private final String jobName; @Getter @NonNull private final UUID jobContextUuid; @Getter @NonNull private final List inputUuids; @Getter @NonNull private final List outputUuids; @Nullable private final String location; @Getter @NonNull private final UUID version; @Nullable private final UUID latestRunUuid; + @Getter @NonNull private final UUID namespaceUuid; + @Getter @NonNull private final String namespaceName; public boolean hasInputUuids() { return !inputUuids.isEmpty(); diff --git a/api/src/main/java/marquez/service/JobService.java b/api/src/main/java/marquez/service/JobService.java index ef259b59fd..35a8d2bd03 100644 --- a/api/src/main/java/marquez/service/JobService.java +++ b/api/src/main/java/marquez/service/JobService.java @@ -18,6 +18,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.util.stream.Collectors.groupingBy; +import static marquez.db.OpenLineageDao.DEFAULT_NAMESPACE_OWNER; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -97,12 +98,17 @@ public JobService( public Job createOrUpdate( @NonNull NamespaceName namespaceName, @NonNull JobName jobName, @NonNull JobMeta jobMeta) throws MarquezServiceException { + NamespaceRow namespace = + namespaceDao.upsert( + UUID.randomUUID(), Instant.now(), namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER); JobRow job = getOrCreateJobRow(namespaceName, jobName, jobMeta); final Version jobVersion = jobMeta.version(namespaceName, jobName); if (!jobVersionDao.exists(jobVersion.getValue())) { UUID jobVersionUuid = - createJobVersion(job.getUuid(), jobVersion, namespaceName, jobName, jobMeta).getUuid(); + createJobVersion( + job.getUuid(), jobVersion, namespace.getUuid(), namespaceName, jobName, jobMeta) + .getUuid(); updateRunFromJobMeta(jobMeta, jobVersionUuid, namespaceName, jobName); // Get a new job as versions have been attached return get(namespaceName, jobName).get(); @@ -127,6 +133,7 @@ private JobRow getOrCreateJobRow(NamespaceName namespaceName, JobName jobName, J private JobVersionRow createJobVersion( UUID jobId, Version jobVersion, + UUID namespaceUuid, NamespaceName namespaceName, JobName jobName, JobMeta jobMeta) { @@ -143,7 +150,10 @@ private JobVersionRow createJobVersion( mapDatasetToUuid(inputRows), mapDatasetToUuid(outputRows), jobMeta.getLocation().orElse(null), - jobVersion); + jobVersion, + jobName.getValue(), + namespaceUuid, + namespaceName.getValue()); JobMetrics.emitVersionMetric( namespaceName.getValue(), jobMeta.getType().toString(), jobName.getValue()); @@ -215,9 +225,21 @@ private JobVersionRow createJobVersionRow( List input, List output, URL location, - Version jobVersion) { + Version jobVersion, + String jobName, + UUID namespaceUuid, + String namespaceName) { final JobVersionRow newJobVersionRow = - Mapper.toJobVersionRow(jobRowId, contextRowId, input, output, location, jobVersion); + Mapper.toJobVersionRow( + jobRowId, + jobName, + contextRowId, + input, + output, + location, + jobVersion, + namespaceUuid, + namespaceName); jobVersionDao.insert(newJobVersionRow); return newJobVersionRow; diff --git a/api/src/main/java/marquez/service/mappers/Mapper.java b/api/src/main/java/marquez/service/mappers/Mapper.java index 462c2bd220..5c492aabb0 100644 --- a/api/src/main/java/marquez/service/mappers/Mapper.java +++ b/api/src/main/java/marquez/service/mappers/Mapper.java @@ -406,23 +406,29 @@ public static JobContextRow toJobContextRow( public static JobVersionRow toJobVersionRow( @NonNull final UUID jobRowUuid, + @NonNull final String jobName, @NonNull final UUID jobContextRowUuid, @NonNull final List inputs, @NonNull final List outputs, @Nullable final URL location, - @NonNull final Version version) { + @NonNull final Version version, + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName) { final Instant now = newTimestamp(); return new JobVersionRow( newRowUuid(), now, now, jobRowUuid, + jobName, jobContextRowUuid, inputs, outputs, (location == null) ? null : location.toString(), version.getValue(), - null); + null, + namespaceUuid, + namespaceName); } public static Run toRun(@NonNull final ExtendedRunRow row) { diff --git a/api/src/main/resources/marquez/db/migration/V20__alter_jobs_to_add_namespace_name.sql b/api/src/main/resources/marquez/db/migration/V20__alter_jobs_to_add_namespace_name.sql new file mode 100644 index 0000000000..ad3f0a5f2a --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V20__alter_jobs_to_add_namespace_name.sql @@ -0,0 +1,2 @@ +ALTER TABLE jobs ADD namespace_name VARCHAR(255); +UPDATE jobs SET namespace_name = namespaces.name FROM namespaces WHERE jobs.namespace_uuid = namespaces.uuid; \ No newline at end of file diff --git a/api/src/main/resources/marquez/db/migration/V21__alter_job_versions_to_add_namespace.sql b/api/src/main/resources/marquez/db/migration/V21__alter_job_versions_to_add_namespace.sql new file mode 100644 index 0000000000..6186c1a5c1 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V21__alter_job_versions_to_add_namespace.sql @@ -0,0 +1,13 @@ +ALTER TABLE job_versions ADD namespace_uuid UUID; +ALTER TABLE job_versions ADD namespace_name VARCHAR(255); +ALTER TABLE job_versions ADD job_name VARCHAR(255); + +UPDATE job_versions SET + namespace_uuid = jobs.namespace_uuid, + namespace_name = jobs.namespace_name, + job_name = jobs.name +FROM jobs +WHERE job_versions.job_uuid = jobs.uuid; + +CREATE INDEX job_versions_selector + ON job_versions (job_name, namespace_name); \ No newline at end of file diff --git a/api/src/test/java/marquez/service/JobServiceTest.java b/api/src/test/java/marquez/service/JobServiceTest.java index 59a1c8b951..6a5fdc8129 100644 --- a/api/src/test/java/marquez/service/JobServiceTest.java +++ b/api/src/test/java/marquez/service/JobServiceTest.java @@ -25,6 +25,7 @@ import static marquez.db.models.ModelGenerator.newNamespaceRowWith; import static marquez.db.models.ModelGenerator.newRowUuid; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -137,7 +138,8 @@ public class JobServiceTest { null, JOB_CONTEXT_ROW.getContext(), NAMESPACE_NAME.getValue(), - JOB_NAME.getValue()); + JOB_NAME.getValue(), + NAMESPACE_ROW.getUuid()); @Rule public MockitoRule rule = MockitoJUnit.rule(); @@ -196,7 +198,7 @@ public void notify(RunTransition transition) { public void testCreateOrUpdate() throws MarquezServiceException { when(namespaceDao.findBy(NAMESPACE_NAME.getValue())).thenReturn(Optional.of(NAMESPACE_ROW)); when(jobVersionDao.findBy(JOB_VERSION_ROW.getUuid())).thenReturn(Optional.of(JOB_VERSION_ROW)); - + when(namespaceDao.upsert(any(), any(), any(), any())).thenReturn(NAMESPACE_ROW); final String checksum = Utils.checksumFor(JOB_META.getContext()); when(jobContextDao.exists(checksum)).thenReturn(false); when(jobContextDao.findBy(checksum)).thenReturn(Optional.of(JOB_CONTEXT_ROW));