Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark3.1, Spark3.2, Spark3.3: Support setting current snapshot with ref #8392

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assume;
import org.junit.Test;
Expand Down Expand Up @@ -219,14 +220,14 @@ public void testInvalidRollbackToSnapshotCases() {

AssertHelpers.assertThrows(
"Should reject calls without all required args",
AnalysisException.class,
"Missing required parameters",
IllegalArgumentException.class,
"Either snapshot_id or ref must be provided, not both",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message doesn't seem right for the arguments that are being passed in. We're missing the required parameters right? Also I don't think we should change the exception that is being thrown for the existing cases (missing required parameters) because we probably want to preserve that behavior in case users rely on it. For the new failure case (ref + snapshot ID) the IllegalArgumentException is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean throw AnalysisException when both snapshot_id and ref are missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amogh-jahagirdar, @aokolnychyi please share your thoughts here.

() -> sql("CALL %s.system.set_current_snapshot('t')", catalogName));

AssertHelpers.assertThrows(
"Should reject calls without all required args",
AnalysisException.class,
"Missing required parameters",
IllegalArgumentException.class,
"Cannot parse identifier for arg table: 1",
() -> sql("CALL %s.system.set_current_snapshot(1L)", catalogName));

AssertHelpers.assertThrows(
Expand All @@ -237,8 +238,8 @@ public void testInvalidRollbackToSnapshotCases() {

AssertHelpers.assertThrows(
"Should reject calls without all required args",
AnalysisException.class,
"Missing required parameters",
IllegalArgumentException.class,
"Either snapshot_id or ref must be provided, not both",
() -> sql("CALL %s.system.set_current_snapshot(table => 't')", catalogName));

AssertHelpers.assertThrows(
Expand All @@ -252,5 +253,58 @@ public void testInvalidRollbackToSnapshotCases() {
IllegalArgumentException.class,
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.set_current_snapshot('', 1L)", catalogName));

Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.set_current_snapshot(table => 't', snapshot_id => 1L, ref => 's1')",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Either snapshot_id or ref must be provided, not both");
}

@Test
public void testSetCurrentSnapshotToRef() {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Snapshot firstSnapshot = table.currentSnapshot();
String ref = "s1";
sql("ALTER TABLE %s CREATE TAG %s", tableName, ref);

sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));

table.refresh();

Snapshot secondSnapshot = table.currentSnapshot();

List<Object[]> output =
sql(
"CALL %s.system.set_current_snapshot(table => '%s', ref => '%s')",
catalogName, tableIdent, ref);

assertEquals(
"Procedure output must match",
ImmutableList.of(row(secondSnapshot.snapshotId(), firstSnapshot.snapshotId())),
output);

assertEquals(
"Set must be successful",
ImmutableList.of(row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));

String notExistRef = "s2";
Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.set_current_snapshot(table => '%s', ref => '%s')",
catalogName, tableIdent, notExistRef))
.isInstanceOf(ValidationException.class)
.hasMessage("Cannot find matching snapshot ID for ref " + notExistRef);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.iceberg.spark.procedures;

import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
Expand All @@ -42,7 +46,8 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.required("snapshot_id", DataTypes.LongType)
ProcedureParameter.optional("snapshot_id", DataTypes.LongType),
ProcedureParameter.optional("ref", DataTypes.StringType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -78,17 +83,22 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
long snapshotId = args.getLong(1);
Long snapshotId = args.isNullAt(1) ? null : args.getLong(1);
String ref = args.isNullAt(2) ? null : args.getString(2);
Preconditions.checkArgument(
(snapshotId != null && ref == null) || (snapshotId == null && ref != null),
"Either snapshot_id or ref must be provided, not both");

return modifyIcebergTable(
tableIdent,
table -> {
Snapshot previousSnapshot = table.currentSnapshot();
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;

table.manageSnapshots().setCurrentSnapshot(snapshotId).commit();
long targetSnapshotId = snapshotId != null ? snapshotId : toSnapshotId(table, ref);
table.manageSnapshots().setCurrentSnapshot(targetSnapshotId).commit();

InternalRow outputRow = newInternalRow(previousSnapshotId, snapshotId);
InternalRow outputRow = newInternalRow(previousSnapshotId, targetSnapshotId);
return new InternalRow[] {outputRow};
});
}
Expand All @@ -97,4 +107,10 @@ public InternalRow[] call(InternalRow args) {
public String description() {
return "SetCurrentSnapshotProcedure";
}

private long toSnapshotId(Table table, String refName) {
SnapshotRef ref = table.refs().get(refName);
ValidationException.check(ref != null, "Cannot find matching snapshot ID for ref " + refName);
return ref.snapshotId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assume;
import org.junit.Test;
Expand Down Expand Up @@ -219,14 +220,14 @@ public void testInvalidRollbackToSnapshotCases() {

AssertHelpers.assertThrows(
"Should reject calls without all required args",
AnalysisException.class,
"Missing required parameters",
IllegalArgumentException.class,
"Either snapshot_id or ref must be provided, not both",
() -> sql("CALL %s.system.set_current_snapshot('t')", catalogName));

AssertHelpers.assertThrows(
"Should reject calls without all required args",
AnalysisException.class,
"Missing required parameters",
IllegalArgumentException.class,
"Cannot parse identifier for arg table: 1",
() -> sql("CALL %s.system.set_current_snapshot(1L)", catalogName));

AssertHelpers.assertThrows(
Expand All @@ -237,8 +238,8 @@ public void testInvalidRollbackToSnapshotCases() {

AssertHelpers.assertThrows(
"Should reject calls without all required args",
AnalysisException.class,
"Missing required parameters",
IllegalArgumentException.class,
"Either snapshot_id or ref must be provided, not both",
() -> sql("CALL %s.system.set_current_snapshot(table => 't')", catalogName));

AssertHelpers.assertThrows(
Expand All @@ -252,5 +253,58 @@ public void testInvalidRollbackToSnapshotCases() {
IllegalArgumentException.class,
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.set_current_snapshot('', 1L)", catalogName));

Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.set_current_snapshot(table => 't', snapshot_id => 1L, ref => 's1')",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Either snapshot_id or ref must be provided, not both");
}

@Test
public void testSetCurrentSnapshotToRef() {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Snapshot firstSnapshot = table.currentSnapshot();
String ref = "s1";
sql("ALTER TABLE %s CREATE TAG %s", tableName, ref);

sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1L, "a"), row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));

table.refresh();

Snapshot secondSnapshot = table.currentSnapshot();

List<Object[]> output =
sql(
"CALL %s.system.set_current_snapshot(table => '%s', ref => '%s')",
catalogName, tableIdent, ref);

assertEquals(
"Procedure output must match",
ImmutableList.of(row(secondSnapshot.snapshotId(), firstSnapshot.snapshotId())),
output);

assertEquals(
"Set must be successful",
ImmutableList.of(row(1L, "a")),
sql("SELECT * FROM %s ORDER BY id", tableName));

String notExistRef = "s2";
Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.set_current_snapshot(table => '%s', ref => '%s')",
catalogName, tableIdent, notExistRef))
.isInstanceOf(ValidationException.class)
.hasMessage("Cannot find matching snapshot ID for ref " + notExistRef);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.iceberg.spark.procedures;

import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
Expand All @@ -42,7 +46,8 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.required("snapshot_id", DataTypes.LongType)
ProcedureParameter.optional("snapshot_id", DataTypes.LongType),
ProcedureParameter.optional("ref", DataTypes.StringType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -78,17 +83,22 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
long snapshotId = args.getLong(1);
Long snapshotId = args.isNullAt(1) ? null : args.getLong(1);
String ref = args.isNullAt(2) ? null : args.getString(2);
Preconditions.checkArgument(
(snapshotId != null && ref == null) || (snapshotId == null && ref != null),
"Either snapshot_id or ref must be provided, not both");

return modifyIcebergTable(
tableIdent,
table -> {
Snapshot previousSnapshot = table.currentSnapshot();
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;

table.manageSnapshots().setCurrentSnapshot(snapshotId).commit();
long targetSnapshotId = snapshotId != null ? snapshotId : toSnapshotId(table, ref);
table.manageSnapshots().setCurrentSnapshot(targetSnapshotId).commit();

InternalRow outputRow = newInternalRow(previousSnapshotId, snapshotId);
InternalRow outputRow = newInternalRow(previousSnapshotId, targetSnapshotId);
return new InternalRow[] {outputRow};
});
}
Expand All @@ -97,4 +107,10 @@ public InternalRow[] call(InternalRow args) {
public String description() {
return "SetCurrentSnapshotProcedure";
}

private long toSnapshotId(Table table, String refName) {
SnapshotRef ref = table.refs().get(refName);
ValidationException.check(ref != null, "Cannot find matching snapshot ID for ref " + refName);
return ref.snapshotId();
}
}
Loading