Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply INCREASE_CONTAINER_SIZE execution strategy for user supplied error codes #21513

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_OOM;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.dataSizeProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
Expand Down Expand Up @@ -54,6 +56,7 @@ public class PrestoSparkSessionProperties
public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED = "spark_retry_on_out_of_memory_with_increased_memory_settings_enabled";
public static final String OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES = "out_of_memory_retry_presto_session_properties";
public static final String OUT_OF_MEMORY_RETRY_SPARK_CONFIGS = "out_of_memory_retry_spark_configs";
public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES = "spark_retry_on_out_of_memory_with_increased_memory_settings_error_codes";
public static final String SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR = "spark_average_input_data_size_per_executor";
public static final String SPARK_MAX_EXECUTOR_COUNT = "spark_max_executor_count";
public static final String SPARK_MIN_EXECUTOR_COUNT = "spark_min_executor_count";
Expand Down Expand Up @@ -184,6 +187,15 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig)
true,
value -> MAP_SPLITTER.split(nullToEmpty((String) value)),
value -> value),
new PropertyMetadata<>(
SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES,
"Error Codes to retry with increase memory settings",
VARCHAR,
List.class,
ImmutableList.of(EXCEEDED_LOCAL_MEMORY_LIMIT.name(), SPARK_EXECUTOR_OOM.name()),
false,
value -> Splitter.on(',').trimResults().omitEmptyStrings().splitToList(value.toString().toUpperCase()),
value -> value),
dataSizeProperty(
SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR,
"Average input data size per executor",
Expand Down Expand Up @@ -351,6 +363,11 @@ public static Map<String, String> getOutOfMemoryRetrySparkConfigs(Session sessio
return session.getSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, Map.class);
}

public static List<String> getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes(Session session)
{
return session.getSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, List.class);
}

public static DataSize getAverageInputDataSizePerExecutor(Session session)
{
return session.getSystemProperty(SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@
import java.util.List;

import static com.facebook.presto.execution.ExecutionFailureInfo.toStackTraceElement;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryBroadcastJoinEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryWithIncreasedMemoryEnabled;
import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_OOM;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.DISABLE_BROADCAST_JOIN;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.INCREASE_CONTAINER_SIZE;
import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.INCREASE_HASH_PARTITION_COUNT;
import static com.facebook.presto.spi.ErrorCause.LOW_PARTITION_COUNT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_BROADCAST_JOIN_MEMORY_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -103,8 +102,8 @@ private static List<ExecutionStrategy> getRetryExecutionStrategies(Session sessi
strategies.add(DISABLE_BROADCAST_JOIN);
}

if (isRetryOnOutOfMemoryWithIncreasedMemoryEnabled(session)
&& (errorCode.equals(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode()) || errorCode.equals(SPARK_EXECUTOR_OOM.toErrorCode()))) {
if (isRetryOnOutOfMemoryWithIncreasedMemoryEnabled(session) &&
getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes(session).contains(errorCode.getName().toUpperCase())) {
strategies.add(INCREASE_CONTAINER_SIZE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.STORAGE_BASED_BROADCAST_JOIN_ENABLED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy.PUSH_THROUGH_LOW_MEMORY_OPERATORS;
Expand Down Expand Up @@ -1078,6 +1079,53 @@ public void testRetryOnOutOfMemoryWithIncreasedContainerSize()
"select * from lineitem l join orders o on l.orderkey = o.orderkey");
}

@Test
public void testRetryOnOutOfMemoryWithIncreasedContainerSizeWithSessionPropertiesProvidedErrorCode()
{
Session session = Session.builder(getSession())
.setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
.setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "false")
.build();

// Query should fail with OOM
assertQueryFails(
session,
"select * from lineitem l join orders o on l.orderkey = o.orderkey",
".*Query exceeded per-node .* memory limit of 2MB.*");

session = Session.builder(getSession())
.setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
.setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true")
// Do not provide any error code. INCREASE_CONTAINER_SIZE strategy won't be applied
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, "")
.setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
.setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
.build();

// Query should fail with OOM
assertQueryFails(
session,
"select * from lineitem l join orders o on l.orderkey = o.orderkey",
".*Query exceeded per-node .* memory limit of 2MB.*");

session = Session.builder(getSession())
.setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
.setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true")
// Support EXCEEDED_LOCAL_MEMORY_LIMIT as the retry error code. INCREASE_CONTAINER_SIZE strategy will be applied
.setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, "EXCEEDED_LOCAL_MEMORY_LIMIT")
.setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
.setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
.build();

// Query should succeed since memory will be increased on retry
assertQuery(
session,
"select * from lineitem l join orders o on l.orderkey = o.orderkey");
}

@Test
public void testSmileSerialization()
{
Expand Down
Loading