diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java index 5bf76d759b65..c066db99d9d9 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSessionProperties.java @@ -25,6 +25,8 @@ import java.util.Map; import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_OOM; +import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static com.facebook.presto.spi.session.PropertyMetadata.dataSizeProperty; import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty; @@ -54,6 +56,7 @@ public class PrestoSparkSessionProperties public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED = "spark_retry_on_out_of_memory_with_increased_memory_settings_enabled"; public static final String OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES = "out_of_memory_retry_presto_session_properties"; public static final String OUT_OF_MEMORY_RETRY_SPARK_CONFIGS = "out_of_memory_retry_spark_configs"; + public static final String SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES = "spark_retry_on_out_of_memory_with_increased_memory_settings_error_codes"; public static final String SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR = "spark_average_input_data_size_per_executor"; public static final String SPARK_MAX_EXECUTOR_COUNT = "spark_max_executor_count"; public static final String SPARK_MIN_EXECUTOR_COUNT = "spark_min_executor_count"; @@ -184,6 +187,15 @@ public PrestoSparkSessionProperties(PrestoSparkConfig prestoSparkConfig) true, value -> MAP_SPLITTER.split(nullToEmpty((String) value)), value -> value), + new PropertyMetadata<>( + SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, + "Error Codes to retry with increase memory settings", + VARCHAR, + List.class, + ImmutableList.of(EXCEEDED_LOCAL_MEMORY_LIMIT.name(), SPARK_EXECUTOR_OOM.name()), + false, + value -> Splitter.on(',').trimResults().omitEmptyStrings().splitToList(value.toString().toUpperCase()), + value -> value), dataSizeProperty( SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR, "Average input data size per executor", @@ -351,6 +363,11 @@ public static Map getOutOfMemoryRetrySparkConfigs(Session sessio return session.getSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, Map.class); } + public static List getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes(Session session) + { + return session.getSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, List.class); + } + public static DataSize getAverageInputDataSizePerExecutor(Session session) { return session.getSystemProperty(SPARK_AVERAGE_INPUT_DATA_SIZE_PER_EXECUTOR, DataSize.class); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkFailureUtils.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkFailureUtils.java index a52e2956ee6e..9ece36cbaa26 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkFailureUtils.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/util/PrestoSparkFailureUtils.java @@ -26,16 +26,15 @@ import java.util.List; import static com.facebook.presto.execution.ExecutionFailureInfo.toStackTraceElement; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes; import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryBroadcastJoinEnabled; import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryWithHigherHashPartitionCountEnabled; import static com.facebook.presto.spark.PrestoSparkSessionProperties.isRetryOnOutOfMemoryWithIncreasedMemoryEnabled; -import static com.facebook.presto.spark.SparkErrorCode.SPARK_EXECUTOR_OOM; import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.DISABLE_BROADCAST_JOIN; import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.INCREASE_CONTAINER_SIZE; import static com.facebook.presto.spark.classloader_interface.ExecutionStrategy.INCREASE_HASH_PARTITION_COUNT; import static com.facebook.presto.spi.ErrorCause.LOW_PARTITION_COUNT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_BROADCAST_JOIN_MEMORY_LIMIT; -import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -103,8 +102,8 @@ private static List getRetryExecutionStrategies(Session sessi strategies.add(DISABLE_BROADCAST_JOIN); } - if (isRetryOnOutOfMemoryWithIncreasedMemoryEnabled(session) - && (errorCode.equals(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode()) || errorCode.equals(SPARK_EXECUTOR_OOM.toErrorCode()))) { + if (isRetryOnOutOfMemoryWithIncreasedMemoryEnabled(session) && + getRetryOnOutOfMemoryWithIncreasedMemoryErrorCodes(session).contains(errorCode.getName().toUpperCase())) { strategies.add(INCREASE_CONTAINER_SIZE); } diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkQueryRunner.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkQueryRunner.java index 1b7afa75274d..31f227ff76c1 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkQueryRunner.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkQueryRunner.java @@ -49,6 +49,7 @@ import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED; import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED; import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED; +import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES; import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE; import static com.facebook.presto.spark.PrestoSparkSessionProperties.STORAGE_BASED_BROADCAST_JOIN_ENABLED; import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy.PUSH_THROUGH_LOW_MEMORY_OPERATORS; @@ -1078,6 +1079,53 @@ public void testRetryOnOutOfMemoryWithIncreasedContainerSize() "select * from lineitem l join orders o on l.orderkey = o.orderkey"); } + @Test + public void testRetryOnOutOfMemoryWithIncreasedContainerSizeWithSessionPropertiesProvidedErrorCode() + { + Session session = Session.builder(getSession()) + .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB") + .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB") + .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "false") + .build(); + + // Query should fail with OOM + assertQueryFails( + session, + "select * from lineitem l join orders o on l.orderkey = o.orderkey", + ".*Query exceeded per-node .* memory limit of 2MB.*"); + + session = Session.builder(getSession()) + .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB") + .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB") + .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true") + // Do not provide any error code. INCREASE_CONTAINER_SIZE strategy won't be applied + .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, "") + .setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB") + .setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G") + .build(); + + // Query should fail with OOM + assertQueryFails( + session, + "select * from lineitem l join orders o on l.orderkey = o.orderkey", + ".*Query exceeded per-node .* memory limit of 2MB.*"); + + session = Session.builder(getSession()) + .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB") + .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB") + .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true") + // Support EXCEEDED_LOCAL_MEMORY_LIMIT as the retry error code. INCREASE_CONTAINER_SIZE strategy will be applied + .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, "EXCEEDED_LOCAL_MEMORY_LIMIT") + .setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB") + .setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G") + .build(); + + // Query should succeed since memory will be increased on retry + assertQuery( + session, + "select * from lineitem l join orders o on l.orderkey = o.orderkey"); + } + @Test public void testSmileSerialization() {