Skip to content

Commit

Permalink
Spark: Backport #10373 to Spark 3.3 and 3.4 (#10546)
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar authored Jun 21, 2024
1 parent 1ec69d1 commit e57b9f6
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -345,8 +346,11 @@ private void replaceManifests(
// don't clean up added manifest files, because they may have been successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
if (e instanceof CleanableFailure) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BasePositionDeltaWriter;
Expand Down Expand Up @@ -102,7 +102,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Distribution requiredDistribution;
private final SortOrder[] requiredOrdering;

private boolean cleanupOnAbort = true;
private boolean cleanupOnAbort = false;

SparkPositionDeltaWrite(
SparkSession spark,
Expand Down Expand Up @@ -284,9 +284,9 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (CommitStateUnknownException commitStateUnknownException) {
cleanupOnAbort = false;
throw commitStateUnknownException;
} catch (Exception e) {
cleanupOnAbort = e instanceof CleanableFailure;
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
Expand Down Expand Up @@ -101,7 +101,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final Distribution requiredDistribution;
private final SortOrder[] requiredOrdering;

private boolean cleanupOnAbort = true;
private boolean cleanupOnAbort = false;

SparkWrite(
SparkSession spark,
Expand Down Expand Up @@ -216,9 +216,9 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (CommitStateUnknownException commitStateUnknownException) {
cleanupOnAbort = false;
throw commitStateUnknownException;
} catch (Exception e) {
cleanupOnAbort = e instanceof CleanableFailure;
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -355,8 +356,11 @@ private void replaceManifests(
// don't clean up added manifest files, because they may have been successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
if (e instanceof CleanableFailure) {
// delete all new manifests because the rewrite failed
deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
}

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BasePositionDeltaWriter;
Expand Down Expand Up @@ -105,7 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
private final Context context;
private final Map<String, String> writeProperties;

private boolean cleanupOnAbort = true;
private boolean cleanupOnAbort = false;

SparkPositionDeltaWrite(
SparkSession spark,
Expand Down Expand Up @@ -293,9 +293,9 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (CommitStateUnknownException commitStateUnknownException) {
cleanupOnAbort = false;
throw commitStateUnknownException;
} catch (Exception e) {
cleanupOnAbort = e instanceof CleanableFailure;
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
Expand Down Expand Up @@ -102,7 +102,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final SparkWriteRequirements writeRequirements;
private final Map<String, String> writeProperties;

private boolean cleanupOnAbort = true;
private boolean cleanupOnAbort = false;

SparkWrite(
SparkSession spark,
Expand Down Expand Up @@ -222,9 +222,9 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (CommitStateUnknownException commitStateUnknownException) {
cleanupOnAbort = false;
throw commitStateUnknownException;
} catch (Exception e) {
cleanupOnAbort = e instanceof CleanableFailure;
throw e;
}
}

Expand Down

0 comments on commit e57b9f6

Please sign in to comment.