diff --git a/CHANGELOG.md b/CHANGELOG.md index 66871df840..c9b0e02e40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.29.0...HEAD) +### Added + +* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2350`](https://github.com/MarquezProject/marquez/pull/2350), [`2355`](https://github.com/MarquezProject/marquez/pull/2355), [`2359`](https://github.com/MarquezProject/marquez/pull/2359) + [@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + * Performance improvement storing and querying facets. + * Migration procedure requires manual steps if database has more than 100K lineage events or Postgres version < 13. + * We highly encourage users to review our [migration plan](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md). + ## [0.29.0](https://github.com/MarquezProject/marquez/compare/0.28.0...0.29.0) - 2022-12-19 ### Added diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index caecffb14a..feac31f882 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -27,6 +27,7 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import marquez.api.filter.JobRedirectFilter; +import marquez.cli.DbMigrationCommand; import marquez.cli.MetadataCommand; import marquez.cli.SeedCommand; import marquez.common.Utils; @@ -149,6 +150,12 @@ public void registerResources( } } + @Override + protected void addDefaultCommands(Bootstrap bootstrap) { + bootstrap.addCommand(new DbMigrationCommand<>(this)); + super.addDefaultCommands(bootstrap); + } + private MarquezContext buildMarquezContext( MarquezConfig config, Environment env, ManagedDataSource source) { final JdbiFactory factory = new JdbiFactory(); diff --git a/api/src/main/java/marquez/cli/DbMigrationCommand.java b/api/src/main/java/marquez/cli/DbMigrationCommand.java new file mode 100644 index 0000000000..34953fddcd --- /dev/null +++ b/api/src/main/java/marquez/cli/DbMigrationCommand.java @@ -0,0 +1,113 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.cli; + +import io.dropwizard.Application; +import io.dropwizard.cli.EnvironmentCommand; +import io.dropwizard.db.DataSourceFactory; +import io.dropwizard.db.ManagedDataSource; +import io.dropwizard.jdbi3.JdbiFactory; +import io.dropwizard.setup.Environment; +import javax.sql.DataSource; +import lombok.extern.slf4j.Slf4j; +import marquez.db.migrations.V57_1__BackfillFacets; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.jackson2.Jackson2Plugin; +import org.jdbi.v3.postgres.PostgresPlugin; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; + +/** + * A command to manually run database migrations when needed. This migration requires a heavy DB + * operation which can be done asynchronously (with limited API downtime) due to separate migration + * command. + */ +@Slf4j +public class DbMigrationCommand extends EnvironmentCommand { + + private static final String DB_MIGRATE = "db-migrate"; + private static final String MIGRATION_V57_DESCRIPTION = + """ + A command to manually run V57 database migration. + Please refer to https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md for more details. + """; + + private static final String COMMAND_DESCRIPTION = + """ + A command to manually run database migrations. + Extra parameters are required to specify the migration to run. + """; + + /** + * Creates a new environment command. + * + * @param application the application providing this command + */ + public DbMigrationCommand(Application application) { + super(application, DB_MIGRATE, COMMAND_DESCRIPTION); + } + + @Override + public void configure(Subparser subparser) { + subparser + .addArgument("--chunkSize") + .dest("chunkSize") + .type(Integer.class) + .required(false) + .setDefault(V57_1__BackfillFacets.DEFAULT_CHUNK_SIZE) + .help("amount of lineage_events rows processed in a single SQL query and transaction."); + + subparser + .addArgument("--version") + .dest("version") + .type(String.class) + .required(true) + .help("migration version to apply like 'v57'"); + + addFileArgument(subparser); + } + + @Override + protected void run( + Environment environment, Namespace namespace, marquez.MarquezConfig configuration) + throws Exception { + + final DataSourceFactory sourceFactory = configuration.getDataSourceFactory(); + final DataSource source = sourceFactory.build(environment.metrics(), "MarquezApp-source"); + final JdbiFactory factory = new JdbiFactory(); + + Jdbi jdbi = + factory + .build( + environment, + configuration.getDataSourceFactory(), + (ManagedDataSource) source, + "postgresql-command") + .installPlugin(new SqlObjectPlugin()) + .installPlugin(new PostgresPlugin()) + .installPlugin(new Jackson2Plugin()); + + MarquezMigrations.valueOf(namespace.getString("version")).run(jdbi, namespace); + } + + enum MarquezMigrations { + v57 { + public void run(Jdbi jdbi, Namespace namespace) throws Exception { + log.info("Running V57_1__BackfillFacets migration"); + V57_1__BackfillFacets migration = new V57_1__BackfillFacets(); + migration.setManual(true); + migration.setJdbi(jdbi); + migration.setChunkSize(namespace.getInt("chunkSize")); + migration.migrate(null); + } + }; + + public void run(Jdbi jdbi, Namespace namespace) throws Exception { + throw new UnsupportedOperationException(); + } + } +} diff --git a/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java b/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java new file mode 100644 index 0000000000..db4163450d --- /dev/null +++ b/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java @@ -0,0 +1,245 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.migrations; + +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import marquez.db.Columns; +import org.flywaydb.core.api.MigrationVersion; +import org.flywaydb.core.api.migration.Context; +import org.flywaydb.core.api.migration.JavaMigration; +import org.jdbi.v3.core.Jdbi; + +@Slf4j +public class V57_1__BackfillFacets implements JavaMigration { + + public static int DEFAULT_CHUNK_SIZE = 10000; + + private static int BASIC_MIGRATION_LIMIT = 100000; + + private static final String GET_CURRENT_LOCK_SQL = + """ + SELECT * FROM facet_migration_lock + ORDER BY created_at ASC, run_uuid ASC + LIMIT 1 + """; + + private static final String GET_FINISHING_LOCK_SQL = + """ + SELECT run_uuid, created_at FROM lineage_events + ORDER BY + COALESCE(created_at, event_time) ASC, + run_uuid ASC + LIMIT 1 + """; + + private static final String GET_INITIAL_LOCK_SQL = + """ + SELECT + run_uuid, + COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at + FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1 + """; + + private static final String COUNT_LINEAGE_EVENTS_SQL = + """ + SELECT count(*) as cnt FROM lineage_events + """; + + private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL = + """ + SELECT count(*) as cnt FROM lineage_events e + WHERE + COALESCE(e.created_at, e.event_time) < :createdAt + OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid) + """; + + private String getBackFillFacetsSQL() { + return String.format( + """ + WITH events_chunk AS ( + SELECT e.* FROM lineage_events e + WHERE + COALESCE(e.created_at, e.event_time) < :createdAt + OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid) + ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC + LIMIT :chunkSize + ), + insert_datasets AS ( + INSERT INTO dataset_facets %s + ), + insert_runs AS ( + INSERT INTO run_facets %s + ), + insert_jobs AS ( + INSERT INTO job_facets %s + ) + INSERT INTO facet_migration_lock + SELECT events_chunk.created_at, events_chunk.run_uuid + FROM events_chunk + ORDER BY + COALESCE(events_chunk.created_at, events_chunk.event_time) ASC, + events_chunk.run_uuid ASC + LIMIT 1 + RETURNING created_at, run_uuid; + """, + V56_1__FacetViews.getDatasetFacetsDefinitionSQL(jdbi, "events_chunk"), + V56_1__FacetViews.getRunFacetsDefinitionSQL(jdbi, "events_chunk"), + V56_1__FacetViews.getJobFacetsDefinitionSQL(jdbi, "events_chunk")); + } + + @Setter private Integer chunkSize = null; + + @Setter private boolean manual = false; + + @Setter private Jdbi jdbi; + + public int getChunkSize() { + return chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE; + } + + @Override + public MigrationVersion getVersion() { + return MigrationVersion.fromVersion("57.2"); + } + + @Override + public String getDescription() { + return "BackFillFacets"; + } + + @Override + public Integer getChecksum() { + return null; + } + + @Override + public boolean isUndo() { + return false; + } + + @Override + public boolean isBaselineMigration() { + return false; + } + + @Override + public boolean canExecuteInTransaction() { + return false; + } + + @Override + public void migrate(Context context) throws Exception { + if (context != null) { + jdbi = Jdbi.create(context.getConnection()); + } + + if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) { + // lineage_events table is empty -> no need to run migration + // anyway. we need to create lock to mark that no data requires migration + execute("INSERT INTO facet_migration_lock VALUES (NOW(), uuid_generate_v4())"); + + createTargetViews(); + return; + } + Optional lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL); + + if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) { + log.warn( + """ + ================================================== + ================================================== + ================================================== + MARQUEZ INSTANCE TOO BIG TO RUN AUTO UPGRADE. + YOU NEED TO RUN v55_migrate COMMAND MANUALLY. + FOR MORE DETAILS, PLEASE REFER TO: + https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md + ================================================== + ================================================== + ================================================== + """); + // We end migration successfully although no data has been migrated to facet tables + return; + } + + log.info("Configured chunkSize is {}", getChunkSize()); + MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get()); + while (!lock.equals(lastExpectedLock.get())) { + lock = backFillChunk(lock); + log.info( + "Migrating chunk finished. Still having {} records to migrate.", + countLineageEventsToProcess(lock)); + } + + createTargetViews(); + log.info("All records migrated"); + } + + private void createTargetViews() { + // replace facet views with tables + execute("DROP VIEW IF EXISTS run_facets_view"); + execute("DROP VIEW IF EXISTS job_facets_view"); + execute("DROP VIEW IF EXISTS dataset_facets_view"); + execute("CREATE OR REPLACE VIEW run_facets_view AS SELECT * FROM run_facets"); + execute("CREATE OR REPLACE VIEW job_facets_view AS SELECT * FROM job_facets"); + execute("CREATE OR REPLACE VIEW dataset_facets_view AS SELECT * FROM dataset_facets"); + } + + private void execute(String sql) { + jdbi.inTransaction(handle -> handle.execute(sql)); + } + + private MigrationLock backFillChunk(MigrationLock lock) { + String backFillQuery = getBackFillFacetsSQL(); + return jdbi.withHandle( + h -> + h.createQuery(backFillQuery) + .bind("chunkSize", getChunkSize()) + .bind("createdAt", lock.created_at) + .bind("runUuid", lock.run_uuid) + .map( + rs -> + new MigrationLock( + rs.getColumn(Columns.RUN_UUID, UUID.class), + rs.getColumn(Columns.CREATED_AT, Instant.class))) + .one()); + } + + private Optional getLock(String sql) { + return jdbi.withHandle( + h -> + h.createQuery(sql) + .map( + rs -> + new MigrationLock( + rs.getColumn(Columns.RUN_UUID, UUID.class), + rs.getColumn(Columns.CREATED_AT, Instant.class))) + .findFirst()); + } + + private int countLineageEvents() { + return jdbi.withHandle( + h -> + h.createQuery(COUNT_LINEAGE_EVENTS_SQL) + .map(rs -> rs.getColumn("cnt", Integer.class)) + .one()); + } + + private int countLineageEventsToProcess(MigrationLock lock) { + return jdbi.withHandle( + h -> + h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL) + .bind("createdAt", lock.created_at) + .bind("runUuid", lock.run_uuid) + .map(rs -> rs.getColumn("cnt", Integer.class)) + .one()); + } + + private record MigrationLock(UUID run_uuid, Instant created_at) {} +} diff --git a/api/src/main/resources/marquez/db/migration/V57.1__add_migration_lock.sql b/api/src/main/resources/marquez/db/migration/V57.1__add_migration_lock.sql new file mode 100644 index 0000000000..1d2a059f54 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V57.1__add_migration_lock.sql @@ -0,0 +1,4 @@ +CREATE TABLE facet_migration_lock ( + created_at TIMESTAMPTZ, + run_uuid UUID +); \ No newline at end of file diff --git a/api/src/main/resources/marquez/db/migration/V57__readme.md b/api/src/main/resources/marquez/db/migration/V57__readme.md new file mode 100644 index 0000000000..171620bd35 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V57__readme.md @@ -0,0 +1,63 @@ +# V57 MIGRATION + +`V57` database migration introduces significant performance improvements as it splits facets from `lineage_events` table +into: `dataset_facets`, `run_facets` and `job_facets` tables. Migration procedure requires moving existing data +to newly created tables which can be a heavy and time-consuming task. + +> **_NOTE:_** For Marquez instances with more than 100K `lineage_events`, an extra manual step is required to upgrade. + +> **_NOTE:_** Migration requires PostgresSQL function to generate uuid values. For Postgres version (>=13) +> we are using built-in `gen_random_uuid` function. For version 12 or lower, `uuid-ossp` extension +> needs to be installed. Migration procedure installs it but this requires superuser role for DB access. +> Alternatively, `CREATE EXTENSION IF NOT EXISTS "uuid-ossp"` can be run manually before the migration. +> +> To sum up: +> * If Postgres version is >=13, no extra-work is required. +> * If Postgres version is < 13, either `uuid-ossp` extension has to be installed or db user should be granted _SUPERUSER_ role. + +## <= 100.000 rows in `lineage_events` table + +A standard Flyway migration is run which fills newly created tables into `dataset_facets`, `run_facets` and `job_facets`. +No extra work is required but be prepared for a couple of minutes downtime when performing upgrade. + +## \> 100.000 rows in `lineage_events` table + +For a heavy users, a standard migration does not copy data to newly created tables. The advantage of such an approach +is that an upgrade take just a moment and after that, one can start API to consume new OpenLineage events while +doing the migration asynchronously. Please note that before finishing the migration, some API calls may return +irrelevant results, especially if the output is based on facets. + +To schedule a migration, a command has to be run: +```shell +java -jar api/build/libs/marquez-api-0.30.0-SNAPSHOT.jar db-migrate --version v57 ./marquez.yml +``` +Command processes data in chunks, each chunk is run in transaction, and the command stores a state containing information of +chunks processed. Based on that: +* It can be stopped any time, +* It continues automatically with chunks remaining. + +A default chunk size is `10000` which is a number of `lineage_events` processed in a single query. A chunk size +may be adjusted as command parameter: +```shell +java -jar api/build/libs/marquez-api-0.30.0-SNAPSHOT.jar db-migrate --version v57 --chunkSize 50000 ./marquez.yml +``` + +## How long can the migration procedure take? + +This depends on size of `lineage_events` but also on a characteristics of each event (how big the events are?, how many +facets they include?). + +Performance tests have been implemented in `V57_BackfillFacetsPerformanceTest` class and run +in Docker environment (docker resources: 5cpus, 8GB RAM on Mac M1 PRO). During a test +with 100K `lineage_events`, each event had 48KB and migration resulted +in 500K `job_facets`, 500K `run_facets` and 1.5M `dataset_facets`. Migration took *106 seconds*. +Default `chunkSize` was used. + +Table below shows migration time for different amount of `lineage_events` generated the same way. + +| `lineage_events` | `job_facets` | `run_facets` | `dataset_facets` | time taken | +|------------------|--------------|--------------|------------------|------------| +| 10K events | 50K rows | 50K rows | 150K rows | 10sec | +| 100K events | 500K rows | 500K rows | 150K rows | 106sec | +| 500K events | 2.5M rows | 2.5M rows | 7.5M rows | 612sec | +| 1M events | 5M rows | 5M rows | 15M rows | 1473sec | \ No newline at end of file diff --git a/api/src/test/java/marquez/api/JdbiUtils.java b/api/src/test/java/marquez/api/JdbiUtils.java index aff32f9ebb..0197d4b94d 100644 --- a/api/src/test/java/marquez/api/JdbiUtils.java +++ b/api/src/test/java/marquez/api/JdbiUtils.java @@ -39,6 +39,7 @@ public static void cleanDatabase(Jdbi jdbi) { handle.execute("DELETE FROM dataset_facets"); handle.execute("DELETE FROM run_facets"); handle.execute("DELETE FROM job_facets"); + handle.execute("DELETE FROM facet_migration_lock"); return null; }); } diff --git a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java new file mode 100644 index 0000000000..0118f8ad36 --- /dev/null +++ b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java @@ -0,0 +1,449 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.migrations; + +import static marquez.db.DatasetFacetsDao.Type.DATASET; +import static marquez.db.DatasetFacetsDao.Type.INPUT; +import static marquez.db.DatasetFacetsDao.Type.OUTPUT; +import static marquez.db.DatasetFacetsDao.Type.UNKNOWN; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import marquez.api.JdbiUtils; +import marquez.db.Columns; +import marquez.db.DatasetFacetsDao; +import marquez.db.FacetTestUtils; +import marquez.db.LineageTestUtils; +import marquez.db.OpenLineageDao; +import marquez.db.models.UpdateLineageRow; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.LineageEvent; +import org.flywaydb.core.api.migration.Context; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.postgresql.util.PGobject; + +@org.junit.jupiter.api.Tag("IntegrationTests") +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +public class V57_1__BackfillFacetsTest { + + private static V57_1__BackfillFacets subject = new V57_1__BackfillFacets(); + private static Jdbi jdbi; + + private static OpenLineageDao openLineageDao; + + UpdateLineageRow lineageRow; + Context flywayContext = mock(Context.class); + Connection connection = mock(Connection.class); + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + V57_1__BackfillFacetsTest.jdbi = jdbi; + openLineageDao = jdbi.onDemand(OpenLineageDao.class); + } + + @AfterEach + public void tearDown(Jdbi jdbi) { + JdbiUtils.cleanDatabase(jdbi); + } + + @BeforeEach + public void beforeEach() { + when(flywayContext.getConnection()).thenReturn(connection); + subject.setChunkSize(100); + JdbiUtils.cleanDatabase(jdbi); + } + + @Test + public void testDatasetFacet() throws Exception { + lineageRow = FacetTestUtils.createLineageWithFacets(openLineageDao); + try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { + when(Jdbi.create(connection)).thenReturn(jdbi); + + List expectedInputDatasetFacets = + getDatasetFacetsFor(lineageRow.getRun().getUuid(), "namespace", "dataset_input"); + List expectedOutputDatasetFacets = + getDatasetFacetsFor(lineageRow.getRun().getUuid(), "namespace", "dataset_output"); + + // clear dataset_facets table + jdbi.inTransaction(handle -> handle.execute("DELETE FROM dataset_facets")); + subject.migrate(flywayContext); + + List inputDatasetFacets = + getDatasetFacetsFor(lineageRow.getRun().getUuid(), "namespace", "dataset_input"); + List outputDatasetFacets = + getDatasetFacetsFor(lineageRow.getRun().getUuid(), "namespace", "dataset_output"); + + assertThat(inputDatasetFacets).hasSize(10); + assertThat(inputDatasetFacets) + .containsExactlyInAnyOrder(expectedInputDatasetFacets.toArray(new DatasetFacet[0])); + + assertThat(outputDatasetFacets).hasSize(5); + assertThat(outputDatasetFacets) + .containsExactlyInAnyOrder(expectedOutputDatasetFacets.toArray(new DatasetFacet[0])); + + assertThat(inputDatasetFacets).hasSize(10); + assertThat(getDatasetFacetType(inputDatasetFacets, "documentation")).isEqualTo(DATASET); + assertThat(getDatasetFacetType(inputDatasetFacets, "schema")).isEqualTo(DATASET); + assertThat(getDatasetFacetType(inputDatasetFacets, "dataSource")).isEqualTo(DATASET); + assertThat(getDatasetFacetType(inputDatasetFacets, "description")).isEqualTo(DATASET); + assertThat(getDatasetFacetType(inputDatasetFacets, "lifecycleStateChange")) + .isEqualTo(DATASET); + assertThat(getDatasetFacetType(inputDatasetFacets, "version")).isEqualTo(DATASET); + assertThat(getDatasetFacetType(inputDatasetFacets, "ownership")).isEqualTo(DATASET); + assertThat(getDatasetFacetType(inputDatasetFacets, "dataQualityMetrics")).isEqualTo(INPUT); + assertThat(getDatasetFacetType(inputDatasetFacets, "dataQualityAssertions")).isEqualTo(INPUT); + assertThat(getDatasetFacetType(inputDatasetFacets, "custom-input")).isEqualTo(UNKNOWN); + + assertThat( + inputDatasetFacets.stream() + .filter(df -> df.name.equals("dataSource")) + .findFirst() + .get()) + .hasFieldOrPropertyWithValue("runUuid", lineageRow.getRun().getUuid()) + .hasFieldOrPropertyWithValue( + "datasetUuid", lineageRow.getInputs().get().get(0).getDatasetRow().getUuid()) + .hasFieldOrPropertyWithValue("lineageEventType", "COMPLETE") + .hasFieldOrPropertyWithValue("lineageEventTime", lineageRow.getRun().getCreatedAt()) + .hasFieldOrPropertyWithValue("type", DATASET) + .hasFieldOrPropertyWithValue("name", "dataSource"); + + assertThat( + inputDatasetFacets.stream() + .filter(df -> df.name.equals("dataSource")) + .findFirst() + .get() + .facet + .toString()) + .isEqualTo( + "{\"dataSource\": {\"uri\": \"http://thesource.com\", \"name\": \"the source\", \"_producer\": \"http://test.producer/\", \"_schemaURL\": \"http://test.schema/\"}}"); + + assertThat(outputDatasetFacets).hasSize(5); + assertThat(outputDatasetFacets.stream().map(df -> df.name).collect(Collectors.toList())) + .contains("dataSource", "schema", "custom-output"); + assertThat(getDatasetFacetType(outputDatasetFacets, "outputStatistics")).isEqualTo(OUTPUT); + assertThat(getDatasetFacetType(outputDatasetFacets, "columnLineage")).isEqualTo(DATASET); + } + } + + @Test + public void testMigrateForMultipleChunks() throws Exception { + lineageRow = FacetTestUtils.createLineageWithFacets(openLineageDao); + try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { + when(Jdbi.create(connection)).thenReturn(jdbi); + subject.setChunkSize(1); + + int datasetsFacetsBefore = countDatasetFacets(jdbi); + FacetTestUtils.createLineageWithFacets(openLineageDao); + lineageRow = + FacetTestUtils.createLineageWithFacets( + openLineageDao); // inserted three lineage event rows + + // clear migration lock and dataset_facets table + jdbi.inTransaction(handle -> handle.execute("DELETE FROM dataset_facets")); + jdbi.inTransaction(handle -> handle.execute("DELETE FROM facet_migration_lock")); + subject.migrate(flywayContext); + + int datasetsFacetsAfter = countDatasetFacets(jdbi); + assertThat(datasetsFacetsAfter).isEqualTo(3 * datasetsFacetsBefore); + } + } + + @Test + public void testWhenCurrentLockIsAvailable() throws Exception { + FacetTestUtils.createLineageWithFacets(openLineageDao); + FacetTestUtils.createLineageWithFacets(openLineageDao); + lineageRow = + FacetTestUtils.createLineageWithFacets( + openLineageDao); // point migration_lock to only match the latest lineage event + + jdbi.withHandle( + h -> + h.execute( + """ + INSERT INTO facet_migration_lock + SELECT created_at, run_uuid FROM lineage_events + ORDER by created_at DESC LIMIT 1 + """)); // last lineage row should be skipped + + jdbi.withHandle( + h -> + h.execute( + """ + INSERT INTO facet_migration_lock + SELECT created_at, run_uuid FROM lineage_events + ORDER by created_at DESC LIMIT 1 OFFSET 1 + """)); // middle lineage row should be skipped + + try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { + when(Jdbi.create(connection)).thenReturn(jdbi); + subject.setChunkSize(1); + + // clear migration lock and dataset_facets table + jdbi.inTransaction(handle -> handle.execute("DELETE FROM dataset_facets")); + subject.migrate(flywayContext); + + assertThat(countDatasetFacets(jdbi)).isEqualTo(15); + } + } + + @Test + public void testMigrateForLineageWithNoDatasets() throws Exception { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Collections.emptyList(), + Collections.emptyList()); + try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { + when(Jdbi.create(connection)).thenReturn(jdbi); + + subject.migrate(flywayContext); + + int datasetsFacetsAfter = countDatasetFacets(jdbi); + + assertThat(datasetsFacetsAfter).isEqualTo(0); + } + } + + @Test + public void testRunFacet() throws Exception { + lineageRow = FacetTestUtils.createLineageWithFacets(openLineageDao); + try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { + when(Jdbi.create(connection)).thenReturn(jdbi); + + List runFacetsBefore = getRunFacetsFor(lineageRow.getRun().getUuid()); + jdbi.inTransaction(handle -> handle.execute("DELETE FROM dataset_facets")); + jdbi.inTransaction(handle -> handle.execute("DELETE FROM run_facets")); + subject.migrate(flywayContext); + + List runFacets = getRunFacetsFor(lineageRow.getRun().getUuid()); + + assertThat(runFacets).hasSize(5); + assertThat(runFacetsBefore).containsExactlyInAnyOrder(runFacets.toArray(new RunFacet[0])); + assertThat(runFacets.get(0)).hasFieldOrPropertyWithValue("lineageEventType", "COMPLETE"); + assertThat(runFacets.stream().map(rf -> rf.name).collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "parent", "custom-run-facet", "spark.logicalPlan", "errorMessage", "nominalTime"); + + assertThat( + runFacets.stream() + .filter(rf -> rf.name().equalsIgnoreCase("parent")) + .map(rf -> rf.facet().toString()) + .findFirst() + .get()) + .isEqualTo( + String.format( + "{\"parent\": {\"job\": {\"name\": \"name\", \"namespace\": \"namespace\"}, " + + "\"run\": {\"runId\": \"%s\"}, " + + "\"_producer\": \"http://test.producer/\", " + + "\"_schemaURL\": \"http://test.schema/\"" + + "}}", + lineageRow.getRun().getParentRunUuid().get())); + } + } + + @Test + public void testJobFacet() throws Exception { + lineageRow = FacetTestUtils.createLineageWithFacets(openLineageDao); + try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { + when(Jdbi.create(connection)).thenReturn(jdbi); + + List jobFacetsBefore = getJobFacetsFor(lineageRow.getRun().getJobUuid()); + jdbi.inTransaction(handle -> handle.execute("DELETE FROM job_facets")); + subject.migrate(flywayContext); + + List jobFacets = getJobFacetsFor(lineageRow.getRun().getJobUuid()); + + assertThat(jobFacets).hasSize(5); + assertThat(jobFacetsBefore).containsExactlyInAnyOrder(jobFacets.toArray(new JobFacet[0])); + + assertThat(jobFacets.get(0)).hasFieldOrPropertyWithValue("lineageEventType", "COMPLETE"); + assertThat(jobFacets.stream().map(rf -> rf.name).collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "sourceCodeLocation", "sourceCode", "documentation", "sql", "ownership"); + + assertThat( + jobFacets.stream() + .filter(rf -> rf.name().equalsIgnoreCase("sourceCodeLocation")) + .map(rf -> rf.facet().toString()) + .findFirst() + .get()) + .isEqualTo( + "{\"sourceCodeLocation\": {\"url\": \"git@github.com:OpenLineage/OpenLineage.git\", " + + "\"type\": \"git\", \"_producer\": \"http://test.producer/\", " + + "\"_schemaURL\": \"http://test.schema/\"}}"); + } + } + + @Test + public void testMigrationLockIsInsertedWhenNoDataToMigrate() throws Exception { + try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { + when(Jdbi.create(connection)).thenReturn(jdbi); + + // should be no data to import + subject.migrate(flywayContext); + + Instant lockCreatedAt = + jdbi.withHandle( + h -> + h.createQuery("SELECT created_at FROM facet_migration_lock") + .map(rs -> rs.getColumn("created_at", Instant.class)) + .one()); + + // verify migration lock exists + assertThat(lockCreatedAt).isBefore(Instant.now()); + } + } + + @Test + public void testMigrationForLineageEventsWithNullCreatedAtField() throws Exception { + FacetTestUtils.createLineageWithFacets(openLineageDao); + FacetTestUtils.createLineageWithFacets(openLineageDao); + jdbi.inTransaction(h -> h.execute("UPDATE lineage_events SET created_at = NULL")); + + try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { + when(Jdbi.create(connection)).thenReturn(jdbi); + + jdbi.inTransaction(handle -> handle.execute("DELETE FROM facet_migration_lock")); + subject.migrate(flywayContext); + } + } + + private int countDatasetFacets(Jdbi jdbi) { + return jdbi.withHandle( + h -> + h.createQuery("SELECT count(*) as cnt FROM dataset_facets") + .map(rs -> rs.getColumn("cnt", Integer.class)) + .one()); + } + + private DatasetFacetsDao.Type getDatasetFacetType(List facets, String facetName) { + return facets.stream() + .filter(df -> df.name.equalsIgnoreCase(facetName)) + .findFirst() + .map(df -> df.type) + .orElse(null); + } + + private List getDatasetFacetsFor( + UUID runUuid, String datasetNamespace, String datasetName) { + return jdbi.withHandle( + h -> + h + .createQuery( + """ + SELECT df.* FROM dataset_facets df + JOIN datasets_view d ON d.uuid = df.dataset_uuid + WHERE df.run_uuid = :runUuid AND d.name = :datasetName + AND d.namespace_name = :datasetNamespace + """) + .bind("runUuid", runUuid) + .bind("datasetNamespace", datasetNamespace) + .bind("datasetName", datasetName) + .map( + rs -> + new DatasetFacet( + // rs.getColumn("uuid", UUID.class), omit uuid field + // rs.getColumn(Columns.CREATED_AT, Instant.class), created_at field can + // differ + rs.getColumn(Columns.DATASET_UUID, UUID.class), + rs.getColumn(Columns.RUN_UUID, UUID.class), + rs.getColumn("lineage_event_time", Instant.class), + rs.getColumn("lineage_event_type", String.class), + rs.getColumn(Columns.TYPE, DatasetFacetsDao.Type.class), + rs.getColumn(Columns.NAME, String.class), + rs.getColumn("facet", PGobject.class))) + .stream() + .toList()); + } + + private List getRunFacetsFor(UUID runUuid) { + return jdbi.withHandle( + h -> + h + .createQuery("SELECT * FROM run_facets WHERE run_uuid = :runUuid") + .bind("runUuid", runUuid) + .map( + rs -> + new RunFacet( + // rs.getColumn("uuid", UUID.class), omit uuid field + // rs.getColumn(Columns.CREATED_AT, Instant.class), created_at field can + // differ + rs.getColumn(Columns.RUN_UUID, UUID.class), + rs.getColumn("lineage_event_time", Instant.class), + rs.getColumn("lineage_event_type", String.class), + rs.getColumn(Columns.NAME, String.class), + rs.getColumn("facet", PGobject.class))) + .stream() + .toList()); + } + + private List getJobFacetsFor(UUID jobUuid) { + return jdbi.withHandle( + h -> + h + .createQuery("SELECT * FROM job_facets WHERE job_uuid = :jobUuid") + .bind("jobUuid", jobUuid) + .map( + rs -> + new JobFacet( + rs.getColumn(Columns.JOB_UUID, UUID.class), + rs.getColumn(Columns.RUN_UUID, UUID.class), + rs.getColumn("lineage_event_time", Instant.class), + rs.getColumn("lineage_event_type", String.class), + rs.getColumn(Columns.NAME, String.class), + rs.getColumn("facet", PGobject.class))) + .stream() + .toList()); + } + + record JobFacet( + // UUID uuid, omit uuid field + // Instant createdAt, createdAt field can differ + UUID jobUuid, + UUID runUuid, + Instant lineageEventTime, + String lineageEventType, + String name, + PGobject facet) {} + + record DatasetFacet( + // UUID uuid, omit uuid field + // Instant createdAt, createdAt field can differ + UUID datasetUuid, + UUID runUuid, + Instant lineageEventTime, + String lineageEventType, + DatasetFacetsDao.Type type, + String name, + PGobject facet) {} + + record RunFacet( + // UUID uuid, omit uuid field + // Instant createdAt, createdAt field can differ + UUID runUuid, + Instant lineageEventTime, + String lineageEventType, + String name, + PGobject facet) {} +} diff --git a/api/src/test/java/marquez/db/migrations/V57_BackfillFacetsPerformanceTest.java b/api/src/test/java/marquez/db/migrations/V57_BackfillFacetsPerformanceTest.java new file mode 100644 index 0000000000..0792697b10 --- /dev/null +++ b/api/src/test/java/marquez/db/migrations/V57_BackfillFacetsPerformanceTest.java @@ -0,0 +1,217 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.migrations; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.stream.IntStream; +import lombok.extern.slf4j.Slf4j; +import marquez.api.JdbiUtils; +import marquez.db.FacetTestUtils; +import marquez.db.OpenLineageDao; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import org.flywaydb.core.api.migration.Context; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Test to measure performance of the V55 migration. Currently, not run within circle-ci. Requires + * system property `-DrunPerfTest=true` to be executed + */ +@EnabledIfSystemProperty(named = "runPerfTest", matches = "true") +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +@Slf4j +public class V57_BackfillFacetsPerformanceTest { + + private static V57_1__BackfillFacets subject = new V57_1__BackfillFacets(); + private static Jdbi jdbi; + + private static OpenLineageDao openLineageDao; + + Context flywayContext = mock(Context.class); + Connection connection = mock(Connection.class); + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + V57_BackfillFacetsPerformanceTest.jdbi = jdbi; + openLineageDao = jdbi.onDemand(OpenLineageDao.class); + } + + @AfterEach + public void tearDown(Jdbi jdbi) { + log.info("Starting tear down"); + + // clearing runs table with foreign key checks takes more than migration + jdbi.inTransaction( + handle -> { + handle.execute("ALTER TABLE runs DISABLE TRIGGER ALL;"); + return null; + }); + JdbiUtils.cleanDatabase(jdbi); + jdbi.inTransaction( + handle -> { + handle.execute("ALTER TABLE runs ENABLE TRIGGER ALL;"); + return null; + }); + } + + @BeforeEach + public void beforeEach() { + when(flywayContext.getConnection()).thenReturn(connection); + JdbiUtils.cleanDatabase(jdbi); + } + + @Test + public void testMigration() throws Exception { + prepareLineageEventsTable(); + log.info("Cleaning existing facets tables"); + clearFacetsTablesAndLock(); + + log.info("Starting migration script"); + subject.setChunkSize(10000); + subject.setManual(true); + subject.setJdbi(jdbi); + + Instant before = Instant.now(); + subject.migrate(null); + Instant after = Instant.now(); + + log.info( + "Successfully migrated {} lineage events, which resulted in {} job_facets rows, " + + "{} run_facets rows and {} dataset_facets rows.", + countTableRows("lineage_events"), + countTableRows("job_facets"), + countTableRows("run_facets"), + countTableRows("dataset_facets")); + log.info("Migration took {} seconds.", Duration.between(before, after).toSeconds()); + + assertThat(countTableRows("dataset_facets") > countTableRows("lineage_events")); + assertThat(countTableRows("job_facets") > countTableRows("lineage_events")); + assertThat(countTableRows("run_facets") > countTableRows("lineage_events")); + } + + private void prepareLineageEventsTable() { + /** + * Workaround to register uuid_generate_v4 function to generate uuids. gen_random_uuid() is + * available since Postgres 13 + */ + jdbi.withHandle(h -> h.createCall("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\"").invoke()); + + // generate 10 rows + IntStream.range(0, 10).forEach(i -> FacetTestUtils.createLineageWithFacets(openLineageDao)); + + IntStream.range(0, 9).forEach(i -> duplicateLineageEvents(10)); + + // duplicate 100 events 9 times to have 1000 events + IntStream.range(0, 9).forEach(i -> duplicateLineageEvents(100)); + + // duplicate 1000 events 9 times to have 10K events + IntStream.range(0, 9).forEach(i -> duplicateLineageEvents(1000)); + + // duplicate 10K events 9 times to have 100K events + IntStream.range(0, 9).forEach(i -> duplicateLineageEvents(10000)); + + // duplicate 100K events 9 times to have 1M events + IntStream.range(0, 9).forEach(i -> duplicateLineageEvents(100000)); + + // create run rows to fix run constrains + jdbi.withHandle( + h -> + h.createCall( + """ + WITH single_run AS ( + SELECT * FROM runs limit 1 + ) + INSERT INTO runs + SELECT + e.run_uuid as uuid, + sr.created_at, + sr.updated_at, + sr.job_version_uuid, + sr.run_args_uuid, + sr.nominal_start_time, + sr.nominal_end_time , + sr.current_run_state, + sr.start_run_state_uuid, + sr.end_run_state_uuid, + sr.external_id, + sr.namespace_name, + sr.job_name, + sr.location, + sr.transitioned_at, + sr.started_at, + sr.ended_at, + sr.job_context_uuid, + sr.parent_run_uuid, + sr.job_uuid + FROM single_run sr, lineage_events e + ON CONFLICT DO NOTHING + """) + .invoke()); + + log.info( + "Generated {} lineage events, each of {} bytes size", + countTableRows("lineage_events"), + jdbi.withHandle( + h -> + h.createQuery( + "SELECT OCTET_LENGTH(event::text) AS bytes FROM lineage_events LIMIT 1") + .map(rs -> rs.getColumn("bytes", Integer.class)) + .one())); + } + + private void duplicateLineageEvents(int numberOfDuplicatedEvents) { + jdbi.inTransaction( + handle -> + handle + .createUpdate( + """ + INSERT INTO lineage_events + SELECT + NOW(), + event, + event_type, + job_name, + job_namespace, + producer, + uuid_generate_v4(), + created_at + FROM lineage_events + LIMIT :numberOfDuplicatedEvents + """) + .bind("numberOfDuplicatedEvents", numberOfDuplicatedEvents) + .execute()); + } + + private void clearFacetsTablesAndLock() { + Arrays.asList( + "DELETE FROM job_facets", + "DELETE FROM dataset_facets", + "DELETE FROM run_facets", + "DELETE FROM v55_facet_migration_lock") + .stream() + .forEach(sql -> jdbi.inTransaction(handle -> handle.execute(sql))); + } + + private int countTableRows(String table) { + return jdbi.withHandle( + h -> + h.createQuery("SELECT COUNT(*) AS cnt FROM " + table) + .map(rs -> rs.getColumn("cnt", Integer.class)) + .one()); + } +} diff --git a/codecov.yml b/codecov.yml index 363a04d59a..9feef68fb0 100644 --- a/codecov.yml +++ b/codecov.yml @@ -8,4 +8,5 @@ coverage: ignore: - "api/src/main/java/marquez/db/migrations/V44_1__UpdateRunsWithJobUUID.java" - "api/src/main/java/marquez/db/migrations/V44_2__BackfillAirflowParentRuns.java" - - "api/src/main/java/marquez/db/migrations/V44_3_BackfillJobsWithParents.java" \ No newline at end of file + - "api/src/main/java/marquez/db/migrations/V44_3_BackfillJobsWithParents.java" + - "api/src/main/java/marquez/cli/DbMigrationCommand.java" \ No newline at end of file