-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
AIOPS-3232:As a data engineer, I want the default memory for Spark ap…
…plications to be increased so it works out of the box for more than just toy examples
- Loading branch information
1 parent
b63c0f7
commit b483ed2
Showing
10 changed files
with
383 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
...in/java/com/boozallen/aissemble/upgrade/migration/v1_7_0/SparkMemoryUpgradeMigration.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package com.boozallen.aissemble.upgrade.migration.v1_7_0; | ||
/*- | ||
* #%L | ||
* aiSSEMBLE::Foundation::Upgrade | ||
* %% | ||
* Copyright (C) 2021 Booz Allen | ||
* %% | ||
* This software package is licensed under the Booz Allen Public License. All Rights Reserved. | ||
* #L% | ||
*/ | ||
import com.boozallen.aissemble.upgrade.util.YamlUtils; | ||
import com.boozallen.aissemble.upgrade.util.FileUtils; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import com.boozallen.aissemble.upgrade.migration.AbstractAissembleMigration; | ||
import com.boozallen.aissemble.upgrade.pojo.AbstractYamlObject; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.util.List; | ||
|
||
public class SparkMemoryUpgradeMigration extends AbstractAissembleMigration { | ||
private static final Logger logger = LoggerFactory.getLogger(SparkMemoryUpgradeMigration.class); | ||
@Override | ||
protected boolean shouldExecuteOnFile(File file) { | ||
try { | ||
YamlUtils.YamlObject yamlObject = YamlUtils.loadYaml(file); | ||
YamlUtils.YamlObject specSection = yamlObject.getObject("sparkApp").getObject("spec"); | ||
// Navigate through the YAML structure to access, driver memory, and executor memory | ||
String driverMemory = null; | ||
String executorMemory = null; | ||
if (specSection.getObject("driver") != null) { | ||
driverMemory = specSection.getObject("driver").getString("memory"); | ||
} | ||
if (specSection.getObject("executor") != null) { | ||
executorMemory = specSection.getObject("executor").getString("memory"); | ||
} | ||
|
||
// Check if any of the memory values are not as expected | ||
if ("512m".equals(driverMemory) && "512m".equals(executorMemory)) { | ||
logger.info("Performing migration for file: {}", file.getName()); | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
} catch (IOException e) { | ||
logger.error("Error reading file or parsing YAML: {}", e.getMessage()); | ||
return false; // Exit if unable to read file or parse YAML | ||
} | ||
} | ||
|
||
@Override | ||
protected boolean performMigration(File file) { | ||
return migrateValuesFile(file); | ||
} | ||
|
||
private boolean migrateValuesFile(File file) { | ||
try { | ||
List<String> lines = FileUtils.readAllFileLines(file); | ||
|
||
// Define the old default values to replace | ||
String oldDefaultValue = "512m"; | ||
// Define the new value to replace with | ||
String newValue = "4096m"; | ||
|
||
// Replace old default values with new value | ||
for (int i = 0; i < lines.size(); i++) { | ||
String line = lines.get(i); | ||
if (line.stripLeading().startsWith("memory:") && line.contains(oldDefaultValue)) { | ||
String updatedLine = line.replace(oldDefaultValue, newValue); | ||
lines.set(i, updatedLine); | ||
} | ||
} | ||
|
||
// Write the updated lines back to the file | ||
FileUtils.writeFile(file, lines); | ||
logger.info("Memory values updated successfully in file: {}", file.getName()); | ||
return true; // Indicate successful migration | ||
} catch (IOException e) { | ||
logger.error("Error updating memory values in file {}: {}", file.getName(), e.getMessage()); | ||
return false; // Indicate migration failure | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
...va/com/boozallen/aissemble/upgrade/migration/v1_7_0/SparkMemoryUpgradeMigrationSteps.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package com.boozallen.aissemble.upgrade.migration.v1_7_0; | ||
/*- | ||
* #%L | ||
* aiSSEMBLE::Foundation::Upgrade | ||
* %% | ||
* Copyright (C) 2021 Booz Allen | ||
* %% | ||
* This software package is licensed under the Booz Allen Public License. All Rights Reserved. | ||
* #L% | ||
*/ | ||
|
||
|
||
import static org.junit.Assert.assertFalse; | ||
import static org.junit.Assert.assertTrue; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
|
||
import org.apache.commons.io.FileUtils; | ||
import com.boozallen.aissemble.upgrade.migration.AbstractMigrationTest; | ||
|
||
import io.cucumber.java.en.Given; | ||
import io.cucumber.java.en.Then; | ||
import io.cucumber.java.en.When; | ||
|
||
public class SparkMemoryUpgradeMigrationSteps extends AbstractMigrationTest { | ||
|
||
private SparkMemoryUpgradeMigration sparkmigration; | ||
@Given("a project that has a spark application") | ||
public void a_project_that_has_spark_application() { | ||
assertTrue("The project has a Spark application", true); | ||
} | ||
|
||
@Given("the base value contains the old memory value") | ||
public void the_values_yaml_has_old_memory_value() throws IOException { | ||
testFile = getTestFile("v1_7_0/SparkUpgradeMigration/migration/base-values.yaml"); | ||
} | ||
|
||
@When("the 1.7.0 spark application memory migration executes") | ||
public void the_value_yaml_migration_executes() { | ||
performMigration(new SparkMemoryUpgradeMigration()); | ||
} | ||
|
||
@Then("the memory is updated to new value") | ||
public void the_values_dev_yaml_get_updated() throws IOException { | ||
File validationFile = getTestFile("v1_7_0/SparkUpgradeMigration/validation/base-values.yaml"); | ||
File migratedFile = getTestFile("v1_7_0/SparkUpgradeMigration/migration/base-values.yaml"); | ||
assertTrue("The content of the migrated file does not match the validation file", | ||
FileUtils.contentEqualsIgnoreEOL(migratedFile, validationFile, null)); | ||
assertTrue("Migration did not complete successfully", successful); | ||
} | ||
|
||
@Given("the base values.yaml does not have default memory values") | ||
public void the_values_yaml_has_non_default_memory_value() throws IOException { | ||
testFile = getTestFile("v1_7_0/SparkUpgradeMigration/skip-migration/base-values.yaml"); | ||
} | ||
@Then("the spark application memory migration is skipped") | ||
public void the_values_dev_yaml_unchanged() throws IOException { | ||
assertFalse("The migration should be skipped", shouldExecute); | ||
} | ||
} |
13 changes: 13 additions & 0 deletions
13
...undation-upgrade/src/test/resources/specifications/v1_7_0/spark-upgrade-migration.feature
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
@spark-memory-migration | ||
Feature: Migrate a spark application to use the new memory value | ||
Scenario Outline: Migrate a spark application with new memory value in the base values | ||
Given a project that has a spark application | ||
And the base value contains the old memory value | ||
When the 1.7.0 spark application memory migration executes | ||
Then the memory is updated to new value | ||
|
||
Scenario: Skip spark application memory migration with non-default values in the base values | ||
Given a project that has a spark application | ||
And the base values.yaml does not have default memory values | ||
When the 1.7.0 spark application memory migration executes | ||
Then the spark application memory migration is skipped |
57 changes: 57 additions & 0 deletions
57
...ade/src/test/resources/test-files/v1_7_0/SparkUpgradeMigration/migration/base-values.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
metadata: | ||
name: pyspark-pipeline | ||
sparkApp: | ||
spec: | ||
type: Python | ||
image: "boozallen/aiops-2977-spark-worker-docker:latest" | ||
mainApplicationFile: "local:///opt/spark/jobs/pipelines/pyspark-pipeline/pyspark_pipeline_driver.py" | ||
deps: | ||
packages: | ||
- mysql:mysql-connector-java:8.0.30 | ||
- org.apache.hadoop:hadoop-aws:3.3.4 | ||
- com.amazonaws:aws-java-sdk-bundle:1.12.262 | ||
excludePackages: [] | ||
hadoopConf: | ||
fs.s3a.fast.upload: "true" | ||
fs.s3a.path.style: "true" | ||
driver: | ||
cores: 1 | ||
coreLimit: "1200m" | ||
memory: "512m" | ||
env: | ||
- name: KRAUSENING_BASE | ||
value: /opt/spark/krausening/base | ||
- name: AWS_ACCESS_KEY_ID | ||
value: "123" | ||
- name: AWS_SECRET_ACCESS_KEY | ||
value: "456" | ||
- name: STORAGE_ENDPOINT | ||
value: "http://s3-local:4566" | ||
- name: MyFileStore_FS_PROVIDER | ||
value: "s3" | ||
- name: MyFileStore_FS_ACCESS_KEY_ID | ||
value: "" | ||
- name: MyFileStore_FS_SECRET_ACCESS_KEY | ||
value: "" | ||
- name: MyFileStore_FS_SECURE | ||
value: "false" | ||
- name: MyFileStoreAsync_FS_PROVIDER | ||
value: "s3" | ||
- name: MyFileStoreAsync_FS_ACCESS_KEY_ID | ||
value: "" | ||
- name: MyFileStoreAsync_FS_SECRET_ACCESS_KEY | ||
value: "" | ||
- name: MyFileStoreAsync_FS_SECURE | ||
value: "false" | ||
executor: | ||
cores: 1 | ||
memory: "512m" | ||
env: | ||
- name: KRAUSENING_BASE | ||
value: /opt/spark/krausening/base | ||
- name: AWS_ACCESS_KEY_ID | ||
value: "123" | ||
- name: AWS_SECRET_ACCESS_KEY | ||
value: "456" | ||
- name: STORAGE_ENDPOINT | ||
value: "http://s3-local:4566" |
Oops, something went wrong.