Skip to content

Commit

Permalink
Revert "[apache#706] feat(spark): support spill to avoid memory deadl…
Browse files Browse the repository at this point in the history
…ock (apache#714)"

This reverts commit 1b48c12.
  • Loading branch information
jerqi committed Apr 11, 2023
1 parent cae7cd9 commit e73d339
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 782 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,12 @@
import scala.runtime.AbstractFunction1;

import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;

public class RssSparkConfig {

public static final ConfigOption<Long> RSS_CLIENT_SEND_SIZE_LIMITATION = ConfigOptions
.key("rss.client.send.size.limit")
.longType()
.defaultValue(1024 * 1024 * 16L)
.withDescription("The max data size sent to shuffle server");

public static final ConfigOption<Integer> RSS_MEMORY_SPILL_TIMEOUT = ConfigOptions
.key("rss.client.memory.spill.timeout.sec")
.intType()
.defaultValue(1)
.withDescription("The timeout of spilling data to remote shuffle server, "
+ "which will be triggered by Spark TaskMemoryManager. Unit is sec, default value is 1");

public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";

public static final ConfigEntry<Integer> RSS_PARTITION_NUM_PER_RANGE = createIntegerBuilder(
Expand Down Expand Up @@ -131,6 +116,11 @@ public class RssSparkConfig {
new ConfigBuilder("spark.rss.client.heartBeat.threadNum"))
.createWithDefault(4);

public static final ConfigEntry<String> RSS_CLIENT_SEND_SIZE_LIMIT = createStringBuilder(
new ConfigBuilder("spark.rss.client.send.size.limit")
.doc("The max data size sent to shuffle server"))
.createWithDefault("16m");

public static final ConfigEntry<Integer> RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE = createIntegerBuilder(
new ConfigBuilder("spark.rss.client.unregister.thread.pool.size"))
.createWithDefault(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.shuffle.writer;

import java.util.ArrayList;
import java.util.List;

import org.apache.uniffle.common.ShuffleBlockInfo;
Expand All @@ -26,26 +25,10 @@ public class AddBlockEvent {

private String taskId;
private List<ShuffleBlockInfo> shuffleDataInfoList;
private List<Runnable> processedCallbackChain;

public AddBlockEvent(String taskId, List<ShuffleBlockInfo> shuffleDataInfoList) {
this.taskId = taskId;
this.shuffleDataInfoList = shuffleDataInfoList;
this.processedCallbackChain = new ArrayList<>();
}

public AddBlockEvent(String taskId, List<ShuffleBlockInfo> shuffleBlockInfoList, Runnable callback) {
this.taskId = taskId;
this.shuffleDataInfoList = shuffleBlockInfoList;
this.processedCallbackChain = new ArrayList<>();
addCallback(callback);
}

/**
* @param callback, should not throw any exception and execute fast.
*/
public void addCallback(Runnable callback) {
processedCallbackChain.add(callback);
}

public String getTaskId() {
Expand All @@ -56,10 +39,6 @@ public List<ShuffleBlockInfo> getShuffleDataInfoList() {
return shuffleDataInfoList;
}

public List<Runnable> getProcessedCallbackChain() {
return processedCallbackChain;
}

@Override
public String toString() {
return "AddBlockEvent: TaskId[" + taskId + "], " + shuffleDataInfoList;
Expand Down

This file was deleted.

Loading

0 comments on commit e73d339

Please sign in to comment.