Skip to content

Commit

Permalink
OL facets - PR3 - migrate data to facet tables
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Jan 19, 2023
1 parent 1d1fec4 commit 83929cd
Show file tree
Hide file tree
Showing 10 changed files with 1,099 additions and 1 deletion.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.29.0...HEAD)

### Added

* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2350`](https://github.com/MarquezProject/marquez/pull/2350), [`2355`](https://github.com/MarquezProject/marquez/pull/2355), [`2359`](https://github.com/MarquezProject/marquez/pull/2359)
[@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
* Performance improvement with migration procedure that requires manual steps if database has more than 100K lineage events.
* We highly encourage users to review our [migration plan](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md).

## [0.29.0](https://github.com/MarquezProject/marquez/compare/0.28.0...0.29.0) - 2022-12-19

### Added
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.api.filter.JobRedirectFilter;
import marquez.cli.DbMigrationsCommand;
import marquez.cli.MetadataCommand;
import marquez.cli.SeedCommand;
import marquez.common.Utils;
Expand Down Expand Up @@ -149,6 +150,12 @@ public void registerResources(
}
}

@Override
protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
bootstrap.addCommand(new DbMigrationsCommand<>(this));
super.addDefaultCommands(bootstrap);
}

private MarquezContext buildMarquezContext(
MarquezConfig config, Environment env, ManagedDataSource source) {
final JdbiFactory factory = new JdbiFactory();
Expand Down
113 changes: 113 additions & 0 deletions api/src/main/java/marquez/cli/DbMigrationsCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.cli;

import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.db.ManagedDataSource;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import marquez.db.migrations.V57_1__BackfillFacets;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.jackson2.Jackson2Plugin;
import org.jdbi.v3.postgres.PostgresPlugin;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;

/**
* A command to manually run database migrations when needed. This migration requires a heavy DB
* operation which can be done asynchronously (with limited API downtime) due to separate migration
* command.
*/
@Slf4j
public class DbMigrationsCommand<MarquezConfig> extends EnvironmentCommand<marquez.MarquezConfig> {

private static final String DB_MIGRATE = "db_migrate";
private static final String MIGRATION_V57_DESCRIPTION =
"""
A command to manually run V57 database migration.
Please refer to https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md for more details.
""";

private static final String COMMAND_DESCRIPTION =
"""
A command to manually run database migrations.
Extra parameters are required to specify the migration to run.
""";

/**
* Creates a new environment command.
*
* @param application the application providing this command
*/
public DbMigrationsCommand(Application<marquez.MarquezConfig> application) {
super(application, DB_MIGRATE, COMMAND_DESCRIPTION);
}

@Override
public void configure(Subparser subparser) {
subparser
.addArgument("--chunkSize")
.dest("chunkSize")
.type(Integer.class)
.required(false)
.setDefault(V57_1__BackfillFacets.DEFAULT_CHUNK_SIZE)
.help("amount of lineage_events rows processed in a single SQL query and transaction.");

subparser
.addArgument("--version")
.dest("version")
.type(String.class)
.required(true)
.help("migration version to apply like 'v57'");

addFileArgument(subparser);
}

@Override
protected void run(
Environment environment, Namespace namespace, marquez.MarquezConfig configuration)
throws Exception {

final DataSourceFactory sourceFactory = configuration.getDataSourceFactory();
final DataSource source = sourceFactory.build(environment.metrics(), "MarquezApp-source");
final JdbiFactory factory = new JdbiFactory();

Jdbi jdbi =
factory
.build(
environment,
configuration.getDataSourceFactory(),
(ManagedDataSource) source,
"postgresql-command")
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());

MarquezMigrations.valueOf(namespace.getString("version")).run(jdbi, namespace);
}

enum MarquezMigrations {
v57 {
public void run(Jdbi jdbi, Namespace namespace) throws Exception {
log.info("Running V57_1__BackfillFacets migration");
V57_1__BackfillFacets migration = new V57_1__BackfillFacets();
migration.setManual(true);
migration.setJdbi(jdbi);
migration.setChunkSize(namespace.getInt("chunkSize"));
migration.migrate(null);
}
};

public void run(Jdbi jdbi, Namespace namespace) throws Exception {
throw new UnsupportedOperationException();
}
}
}
245 changes: 245 additions & 0 deletions api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.migrations;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import marquez.db.Columns;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;
import org.jdbi.v3.core.Jdbi;

@Slf4j
public class V57_1__BackfillFacets implements JavaMigration {

public static int DEFAULT_CHUNK_SIZE = 10000;

private static int BASIC_MIGRATION_LIMIT = 100000;

private static final String GET_CURRENT_LOCK_SQL =
"""
SELECT * FROM facet_migration_lock
ORDER BY created_at ASC, run_uuid ASC
LIMIT 1
""";

private static final String GET_FINISHING_LOCK_SQL =
"""
SELECT run_uuid, created_at FROM lineage_events
ORDER BY
COALESCE(created_at, event_time) ASC,
run_uuid ASC
LIMIT 1
""";

private static final String GET_INITIAL_LOCK_SQL =
"""
SELECT
run_uuid,
COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at
FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1
""";

private static final String COUNT_LINEAGE_EVENTS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events
""";

private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
""";

private String getBackFillFacetsSQL() {
return String.format(
"""
WITH events_chunk AS (
SELECT e.* FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC
LIMIT :chunkSize
),
insert_datasets AS (
INSERT INTO dataset_facets %s
),
insert_runs AS (
INSERT INTO run_facets %s
),
insert_jobs AS (
INSERT INTO job_facets %s
)
INSERT INTO facet_migration_lock
SELECT events_chunk.created_at, events_chunk.run_uuid
FROM events_chunk
ORDER BY
COALESCE(events_chunk.created_at, events_chunk.event_time) ASC,
events_chunk.run_uuid ASC
LIMIT 1
RETURNING created_at, run_uuid;
""",
V56_1__FacetViews.getDatasetFacetsDefinitionSQL(jdbi, "events_chunk"),
V56_1__FacetViews.getRunFacetsDefinitionSQL(jdbi, "events_chunk"),
V56_1__FacetViews.getJobFacetsDefinitionSQL(jdbi, "events_chunk"));
}

@Setter private Integer chunkSize = null;

@Setter private boolean manual = false;

@Setter private Jdbi jdbi;

public int getChunkSize() {
return chunkSize != null ? chunkSize : DEFAULT_CHUNK_SIZE;
}

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("57.2");
}

@Override
public String getDescription() {
return "BackFillFacets";
}

@Override
public Integer getChecksum() {
return null;
}

@Override
public boolean isUndo() {
return false;
}

@Override
public boolean isBaselineMigration() {
return false;
}

@Override
public boolean canExecuteInTransaction() {
return false;
}

@Override
public void migrate(Context context) throws Exception {
if (context != null) {
jdbi = Jdbi.create(context.getConnection());
}

if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) {
// lineage_events table is empty -> no need to run migration
// anyway. we need to create lock to mark that no data requires migration
execute("INSERT INTO facet_migration_lock VALUES (NOW(), uuid_generate_v4())");

createTargetViews();
return;
}
Optional<MigrationLock> lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL);

if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) {
log.warn(
"""
==================================================
==================================================
==================================================
MARQUEZ INSTANCE TOO BIG TO RUN AUTO UPGRADE.
YOU NEED TO RUN v55_migrate COMMAND MANUALLY.
FOR MORE DETAILS, PLEASE REFER TO:
https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V57__readme.md
==================================================
==================================================
==================================================
""");
// We end migration successfully although no data has been migrated to facet tables
return;
}

log.info("Configured chunkSize is {}", getChunkSize());
MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get());
while (!lock.equals(lastExpectedLock.get())) {
lock = backFillChunk(lock);
log.info(
"Migrating chunk finished. Still having {} records to migrate.",
countLineageEventsToProcess(lock));
}

createTargetViews();
log.info("All records migrated");
}

private void createTargetViews() {
// replace facet views with tables
execute("DROP VIEW IF EXISTS run_facets_view");
execute("DROP VIEW IF EXISTS job_facets_view");
execute("DROP VIEW IF EXISTS dataset_facets_view");
execute("CREATE OR REPLACE VIEW run_facets_view AS SELECT * FROM run_facets");
execute("CREATE OR REPLACE VIEW job_facets_view AS SELECT * FROM job_facets");
execute("CREATE OR REPLACE VIEW dataset_facets_view AS SELECT * FROM dataset_facets");
}

private void execute(String sql) {
jdbi.inTransaction(handle -> handle.execute(sql));
}

private MigrationLock backFillChunk(MigrationLock lock) {
String backFillQuery = getBackFillFacetsSQL();
return jdbi.withHandle(
h ->
h.createQuery(backFillQuery)
.bind("chunkSize", getChunkSize())
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.one());
}

private Optional<MigrationLock> getLock(String sql) {
return jdbi.withHandle(
h ->
h.createQuery(sql)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.findFirst());
}

private int countLineageEvents() {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_SQL)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
}

private int countLineageEventsToProcess(MigrationLock lock) {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL)
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
}

private record MigrationLock(UUID run_uuid, Instant created_at) {}
}
Loading

0 comments on commit 83929cd

Please sign in to comment.