Skip to content

Commit

Permalink
Core: Fix JDBC Catalog table commit when migrating from schema V0 to …
Browse files Browse the repository at this point in the history
…V1 (#10111) (#10152)
  • Loading branch information
jbonofre authored Apr 16, 2024
1 parent b2a4fda commit c45c9f6
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 9 deletions.
38 changes: 32 additions & 6 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,31 @@ enum SchemaVersion {
static final String TABLE_RECORD_TYPE = "TABLE";
static final String VIEW_RECORD_TYPE = "VIEW";

private static final String V1_DO_COMMIT_SQL =
private static final String V1_DO_COMMIT_TABLE_SQL =
"UPDATE "
+ CATALOG_TABLE_VIEW_NAME
+ " SET "
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ " = ? , "
+ JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
+ " = ?"
+ " WHERE "
+ CATALOG_NAME
+ " = ? AND "
+ TABLE_NAMESPACE
+ " = ? AND "
+ TABLE_NAME
+ " = ? AND "
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ " = ? AND ("
+ RECORD_TYPE
+ " = '"
+ TABLE_RECORD_TYPE
+ "'"
+ " OR "
+ RECORD_TYPE
+ " IS NULL)";
private static final String V1_DO_COMMIT_VIEW_SQL =
"UPDATE "
+ CATALOG_TABLE_VIEW_NAME
+ " SET "
Expand All @@ -72,7 +96,10 @@ enum SchemaVersion {
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ " = ? AND "
+ RECORD_TYPE
+ " = ?";
+ " = "
+ "'"
+ VIEW_RECORD_TYPE
+ "'";
private static final String V0_DO_COMMIT_SQL =
"UPDATE "
+ CATALOG_TABLE_VIEW_NAME
Expand Down Expand Up @@ -504,7 +531,9 @@ private static int update(
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(
(schemaVersion == SchemaVersion.V1) ? V1_DO_COMMIT_SQL : V0_DO_COMMIT_SQL)) {
(schemaVersion == SchemaVersion.V1)
? (isTable ? V1_DO_COMMIT_TABLE_SQL : V1_DO_COMMIT_VIEW_SQL)
: V0_DO_COMMIT_SQL)) {
// UPDATE
sql.setString(1, newMetadataLocation);
sql.setString(2, oldMetadataLocation);
Expand All @@ -513,9 +542,6 @@ private static int update(
sql.setString(4, namespaceToString(identifier.namespace()));
sql.setString(5, identifier.name());
sql.setString(6, oldMetadataLocation);
if (schemaVersion == SchemaVersion.V1) {
sql.setString(7, isTable ? TABLE_RECORD_TYPE : VIEW_RECORD_TYPE);
}

return sql.executeUpdate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
.withRecordCount(2) // needs at least one record or else metrics will filter it out
.build();

static final DataFile FILE_B =
protected static final DataFile FILE_B =
DataFiles.builder(SPEC)
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
Expand Down
62 changes: 60 additions & 2 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -234,6 +235,17 @@ public void testSchemaIsMigratedToAddViewSupport() throws Exception {
.create();

assertThat(jdbcCatalog.listViews(Namespace.of("namespace1"))).hasSize(1).containsExactly(view);

TableIdentifier tableThree = TableIdentifier.of("namespace2", "table3");
jdbcCatalog.createTable(tableThree, SCHEMA);
assertThat(jdbcCatalog.tableExists(tableThree)).isTrue();

// testing append datafile to check commit, it should not throw an exception
jdbcCatalog.loadTable(tableOne).newAppend().appendFile(FILE_A).commit();
jdbcCatalog.loadTable(tableTwo).newAppend().appendFile(FILE_B).commit();

assertThat(jdbcCatalog.tableExists(tableOne)).isTrue();
assertThat(jdbcCatalog.tableExists(tableTwo)).isTrue();
}

@ParameterizedTest
Expand Down Expand Up @@ -979,7 +991,49 @@ public void report(MetricsReport report) {
}
}

private String createMetadataLocationViaJdbcCatalog(TableIdentifier identifier)
throws SQLException {
// temporary connection just to actually create a concrete metadata location
String jdbcUrl = null;
try {
java.nio.file.Path dbFile = Files.createTempFile("temp", "metadata");
jdbcUrl = "jdbc:sqlite:" + dbFile.toAbsolutePath();
} catch (IOException e) {
throw new SQLException("Error while creating temp data", e);
}

Map<String, String> properties = Maps.newHashMap();

properties.put(CatalogProperties.URI, jdbcUrl);

warehouseLocation = this.tableDir.toAbsolutePath().toString();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
properties.put("type", "jdbc");

JdbcCatalog jdbcCatalog =
(JdbcCatalog) CatalogUtil.buildIcebergCatalog("TEMP", properties, conf);
jdbcCatalog.buildTable(identifier, SCHEMA).create();

SQLiteDataSource dataSource = new SQLiteDataSource();
dataSource.setUrl(jdbcUrl);

try (Connection connection = dataSource.getConnection()) {
ResultSet result =
connection
.prepareStatement("SELECT * FROM " + JdbcUtil.CATALOG_TABLE_VIEW_NAME)
.executeQuery();
result.next();
return result.getString(JdbcTableOperations.METADATA_LOCATION_PROP);
}
}

private void initLegacySchema(String jdbcUrl) throws SQLException {
TableIdentifier table1 = TableIdentifier.of(Namespace.of("namespace1"), "table1");
TableIdentifier table2 = TableIdentifier.of(Namespace.of("namespace2"), "table2");

String table1MetadataLocation = createMetadataLocationViaJdbcCatalog(table1);
String table2MetadataLocation = createMetadataLocationViaJdbcCatalog(table2);

SQLiteDataSource dataSource = new SQLiteDataSource();
dataSource.setUrl(jdbcUrl);

Expand All @@ -1000,7 +1054,9 @@ private void initLegacySchema(String jdbcUrl) throws SQLException {
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ ","
+ JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
+ ") VALUES('TEST','namespace1','table1',null,null)")
+ ") VALUES('TEST','namespace1','table1','"
+ table1MetadataLocation
+ "',null)")
.execute();
connection
.prepareStatement(
Expand All @@ -1016,7 +1072,9 @@ private void initLegacySchema(String jdbcUrl) throws SQLException {
+ JdbcTableOperations.METADATA_LOCATION_PROP
+ ","
+ JdbcTableOperations.PREVIOUS_METADATA_LOCATION_PROP
+ ") VALUES('TEST','namespace2','table2',null,null)")
+ ") VALUES('TEST','namespace2','table2','"
+ table2MetadataLocation
+ "',null)")
.execute();
}
}
Expand Down
98 changes: 98 additions & 0 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.file.Files;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Map;
import java.util.Properties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.Test;
import org.sqlite.SQLiteDataSource;

public class TestJdbcUtil {

Expand All @@ -45,4 +51,96 @@ public void testFilterAndRemovePrefix() {

assertThat(expected).isEqualTo(actual);
}

@Test
public void testV0toV1SqlStatements() throws Exception {
java.nio.file.Path dbFile = Files.createTempFile("icebergSchemaUpdate", "db");
String jdbcUrl = "jdbc:sqlite:" + dbFile.toAbsolutePath();

SQLiteDataSource dataSource = new SQLiteDataSource();
dataSource.setUrl(jdbcUrl);

try (JdbcClientPool connections = new JdbcClientPool(jdbcUrl, Maps.newHashMap())) {
// create "old style" SQL schema
connections.newClient().prepareStatement(JdbcUtil.V0_CREATE_CATALOG_SQL).executeUpdate();

// inserting tables
JdbcUtil.doCommitCreateTable(
JdbcUtil.SchemaVersion.V0,
connections,
"TEST",
Namespace.of("namespace1"),
TableIdentifier.of(Namespace.of("namespace1"), "table1"),
"testLocation");
JdbcUtil.doCommitCreateTable(
JdbcUtil.SchemaVersion.V0,
connections,
"TEST",
Namespace.of("namespace1"),
TableIdentifier.of(Namespace.of("namespace1"), "table2"),
"testLocation");

try (PreparedStatement statement =
connections.newClient().prepareStatement(JdbcUtil.V0_LIST_TABLE_SQL)) {
statement.setString(1, "TEST");
statement.setString(2, "namespace1");
ResultSet tables = statement.executeQuery();
tables.next();
assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table1");
tables.next();
assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table2");
}

// updating the schema from V0 to V1
connections.newClient().prepareStatement(JdbcUtil.V1_UPDATE_CATALOG_SQL).execute();

// trying to add a table on the updated schema
JdbcUtil.doCommitCreateTable(
JdbcUtil.SchemaVersion.V1,
connections,
"TEST",
Namespace.of("namespace1"),
TableIdentifier.of(Namespace.of("namespace1"), "table3"),
"testLocation");

// testing the tables after migration and new table added
try (PreparedStatement statement =
connections.newClient().prepareStatement(JdbcUtil.V0_LIST_TABLE_SQL)) {
statement.setString(1, "TEST");
statement.setString(2, "namespace1");
ResultSet tables = statement.executeQuery();
tables.next();
assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table1");
assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isNull();
tables.next();
assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table2");
assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isNull();
tables.next();
assertThat(tables.getString(JdbcUtil.TABLE_NAME)).isEqualTo("table3");
assertThat(tables.getString(JdbcUtil.RECORD_TYPE)).isEqualTo(JdbcUtil.TABLE_RECORD_TYPE);
}

// update a table (commit) created on V1 schema
int updated =
JdbcUtil.updateTable(
JdbcUtil.SchemaVersion.V1,
connections,
"TEST",
TableIdentifier.of(Namespace.of("namespace1"), "table3"),
"newLocation",
"testLocation");
assertThat(updated).isEqualTo(1);

// update a table (commit) migrated from V0 schema
updated =
JdbcUtil.updateTable(
JdbcUtil.SchemaVersion.V1,
connections,
"TEST",
TableIdentifier.of(Namespace.of("namespace1"), "table1"),
"newLocation",
"testLocation");
assertThat(updated).isEqualTo(1);
}
}
}

0 comments on commit c45c9f6

Please sign in to comment.