Skip to content

Commit

Permalink
Apply INCREASE_CONTAINER_SIZE execution strategy for user supplied er…
Browse files Browse the repository at this point in the history
…ror codes
  • Loading branch information
pgupta2 committed Dec 12, 2023
1 parent d9fc66d commit c166b05
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_OOM;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.dataSizeProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
Expand Down Expand Up @@ -54,6 +56,7 @@ public class PrestoSparkSessionProperties
public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED = "spark_retry_on_out_of_memory_with_increased_memory_settings_enabled";
public static final String OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES = "out_of_memory_retry_presto_session_properties";
public static final String OUT_OF_MEMORY_RETRY_SPARK_CONFIGS = "out_of_memory_retry_spark_configs";
public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES = "spark_retry_on_out_of_memory_with_increased_memory_settings_error_codes";
public static final String SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR = "spark_average_input_data_size_per_executor";
public static final String SPARK_MAX_EXECUTOR_COUNT = "spark_max_executor_count";
public static final String SPARK_MIN_EXECUTOR_COUNT = "spark_min_executor_count";
Expand Down Expand Up @@ -184,6 +187,15 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig)
true,
value -> MAP_SPLITTER.split(nullToEmpty((String) value)),
value -> value),
new PropertyMetadata<>(
SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES,
"Error Codes to retry with increase memory settings",
VARCHAR,
List.class,
ImmutableList.of(EXCEEDED_LOCAL_MEMORY_LIMIT.name(), SPARK_EXECUTOR_OOM.name()),
false,
value -> Splitter.on(',').trimResults().omitEmptyStrings().splitToList(value.toString().toUpperCase()),
value -> value),
dataSizeProperty(
SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR,
"Average input data size per executor",
Expand Down Expand Up @@ -351,6 +363,11 @@ public static Map<String, String> getOutOfMemoryRetrySparkConfigs(Session sessio
return session.getSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, Map.class);
}

public static List<String> getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes(Session session)
{
return session.getSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, List.class);
}

public static DataSize getAverageInputDataSizePerExecutor(Session session)
{
return session.getSystemProperty(SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@
import java.util.List;

import static com.facebook.presto.execution.ExecutionFailureInfo.toStackTraceElement;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryBroadcastJoinEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryWithIncreasedMemoryEnabled;
import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_OOM;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.DISABLE_BROADCAST_JOIN;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.INCREASE_CONTAINER_SIZE;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.INCREASE_HASH_PARTITION_COUNT;
import static com.facebook.presto.spi.ErrorCause.LOW_PARTITION_COUNT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_BROADCAST_JOIN_MEMORY_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -103,8 +102,8 @@ private static List<ExecutionStrategy> getRetryExecutionStrategies(Session sessi
strategies.add(DISABLE_BROADCAST_JOIN);
}

if (isRetryOnOutOfMemoryWithIncreasedMemoryEnabled(session)
&& (errorCode.equals(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode()) || errorCode.equals(SPARK_EXECUTOR_OOM.toErrorCode()))) {
if (isRetryOnOutOfMemoryWithIncreasedMemoryEnabled(session) &&
getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes(session).contains(errorCode.getName().toUpperCase())) {
strategies.add(INCREASE_CONTAINER_SIZE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.STORAGE_BASED_BROADCAST_JOIN_ENABLED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy.PUSH_THROUGH_LOW_MEMORY_OPERATORS;
Expand Down Expand Up @@ -1078,6 +1079,53 @@ public void testRetryOnOutOfMemoryWithIncreasedContainerSize()
"select * from lineitem l join orders o on l.orderkey = o.orderkey");
}

@Test
public void testRetryOnOutOfMemoryWithIncreasedContainerSizeWithSessionPropertiesProvidedErrorCode()
{
Session session = Session.builder(getSession())
.setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
.setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "false")
.build();

// Query should fail with OOM
assertQueryFails(
session,
"select * from lineitem l join orders o on l.orderkey = o.orderkey",
".*Query exceeded per-node .* memory limit of 2MB.*");

session = Session.builder(getSession())
.setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
.setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true")
// Do not provide any error code. INCREASE_CONTAINER_SIZE strategy won't be applied
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, "")
.setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
.setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
.build();

// Query should fail with OOM
assertQueryFails(
session,
"select * from lineitem l join orders o on l.orderkey = o.orderkey",
".*Query exceeded per-node .* memory limit of 2MB.*");

session = Session.builder(getSession())
.setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
.setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true")
// Support EXCEEDED_LOCAL_MEMORY_LIMIT as the retry error code. INCREASE_CONTAINER_SIZE strategy will be applied
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, "EXCEEDED_LOCAL_MEMORY_LIMIT")
.setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
.setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
.build();

// Query should succeed since memory will be increased on retry
assertQuery(
session,
"select * from lineitem l join orders o on l.orderkey = o.orderkey");
}

@Test
public void testSmileSerialization()
{
Expand Down

0 comments on commit c166b05

Please sign in to comment.