Skip to content

Commit

Permalink
Spark 3.4: Refactor JobGroupUtils (#8418)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Aug 29, 2023
1 parent a2737a7 commit 96bda46
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

/** Captures information about the current job which is used for displaying on the UI */
public class JobGroupInfo {
private String groupId;
private String description;
private boolean interruptOnCancel;
private final String groupId;
private final String description;
private final boolean interruptOnCancel;

public JobGroupInfo(String groupId, String desc) {
this(groupId, desc, false);
}

public JobGroupInfo(String groupId, String desc, boolean interruptOnCancel) {
this.groupId = groupId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.iceberg.spark;

import java.util.function.Supplier;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.api.java.JavaSparkContext;

public class JobGroupUtils {

Expand All @@ -43,4 +45,20 @@ public static void setJobGroupInfo(SparkContext sparkContext, JobGroupInfo info)
sparkContext.setLocalProperty(
JOB_INTERRUPT_ON_CANCEL, String.valueOf(info.interruptOnCancel()));
}

public static <T> T withJobGroupInfo(
JavaSparkContext sparkContext, JobGroupInfo info, Supplier<T> supplier) {
return withJobGroupInfo(sparkContext.sc(), info, supplier);
}

public static <T> T withJobGroupInfo(
SparkContext sparkContext, JobGroupInfo info, Supplier<T> supplier) {
JobGroupInfo previousInfo = getJobGroupInfo(sparkContext);
try {
setJobGroupInfo(sparkContext, info);
return supplier.get();
} finally {
setJobGroupInfo(sparkContext, previousInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
Expand Down Expand Up @@ -130,18 +129,11 @@ protected Map<String, String> options() {
}

protected <T> T withJobGroupInfo(JobGroupInfo info, Supplier<T> supplier) {
SparkContext context = spark().sparkContext();
JobGroupInfo previousInfo = JobGroupUtils.getJobGroupInfo(context);
try {
JobGroupUtils.setJobGroupInfo(context, info);
return supplier.get();
} finally {
JobGroupUtils.setJobGroupInfo(context, previousInfo);
}
return JobGroupUtils.withJobGroupInfo(sparkContext, info, supplier);
}

protected JobGroupInfo newJobGroupInfo(String groupId, String desc) {
return new JobGroupInfo(groupId + "-" + JOB_COUNTER.incrementAndGet(), desc, false);
return new JobGroupInfo(groupId + "-" + JOB_COUNTER.incrementAndGet(), desc);
}

protected Table newStaticTable(TableMetadata metadata, FileIO io) {
Expand Down

0 comments on commit 96bda46

Please sign in to comment.